Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[remote-shuffle]Remote shuffle manager for spark3.0 (#1356)
* Add RemoteShuffle codebase to OAP (oap-project#1156) * Initial commit * Add pom * Update ignore * Add basic components for remote shuffle writing * Add ExternalSorter for writing to HDFS * Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW * Update ShuffleResolver to write index file and commit * Spill to remote storage * Add RemoteExternalSorter test suite * Test RemoteExternalSorter writer to HDFS * Write as .index, .data * Fix minor bugs * Add tests for RemoteShuffleBlockResolver * General remote shuffle reader * Test getBlockData in Resolver * Test HadoopFileSegmentManagedBuffer * Refactor Resolver and test suite * Fix: check existence first * Test actual reading iterator * Fix appId early getting, add basic RDD shuffle operation test * Fix bug in the condition of empty mapoutput data file, add tests to ensure this * Introduce classes for optimized shuffle writing * Optimized shuffle writer path & tests * Optimized path configurable and refactor * Introduce BypassMergeSortShuffleWriter * Implement bypass mergesort path & tests * Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf * Introduce RemoteAggregator and related classes, refactor RemoteSorter * Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap * Fix: No closing after coping streams * Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter * Fix test suite, test shuffle reader should read by block * Avoid overriding Spark classes to make default shuffle manager still work, and other refactors * Fix wrong importing, make more classes not override Spark code * Make storage master and root directory configurable * Properly get appId while running on distributed env * Lazy evaluation for getting SparkEnv vars * Add a remote bypass-merge threshold conf * Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default * Fix * Use SortShuffle's block iterator framework including shuffle blocks pre-fetch * Not loading any default config from files, and more objects reuse * Make replica configurable * Rename to ShuffleRemoteSorter * Fix: use RemoteSorter instead of ExternalSorter * Introduce DAGScheduler * With executors lost, no need to rerun map tasks thanks to remote shuffle * Require remote shuffle and external shuffle service not be enabled at the same time * When index cache enabled, fetch index files from executors who wrote them * Read index from Guava cache * UT doesn't rely on external systems * Add travis support * add code for read/write metrics (oap-project#5) * update read/write metrics * write/read metrics 功能添加完毕 * Delete compile.sh * metrics pr * metrics pr * add code about read/write metrics * add codes about shuffle read/write * add codes about shuffle read/write * remove work file * Fix wrong offset and length (oap-project#6) * Fix NettyBlockRpcServer: only cast type when remote shuffle enabled * Add micro-benchmark for shuffle writers/reader (oap-project#3) * Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces * Update travis * Fix * Add other 2 writers' benchmark * Add reader micro-benchmark * Multiple stages in Travis to avoid timeout * Post benchmark results as PR comments * Fix * Debug * Debug * Fix * Beautify * Minor fix * Some renames for better understanding * Style * spark reads hadoop conf remotely (oap-project#8) ### What changes were proposed in this pull request? Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request. ### How was this patch tested? By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely. * Docs (oap-project#19) Add configuration and tuning guides. * remove remain/release in RemoteShuffleBlockIterator (oap-project#23) The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations. * Read DAOS conf from local * check style when compiling (oap-project#24) Add scala style check * Remove extra input stream layer, not needed because no buffer releasing (oap-project#25) Extra layer brings overhead. * All skip -> seek * More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path * Data file asynchronous pre-fetch from multiple sources (oap-project#30) This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight. In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first. Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads) More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset. * Refactor & style * Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33) * Put index info in cache in map stage if index cache is enabled * Refactor * Fix * Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36) Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed. * Add corruption detect (oap-project#34) * Add corruption detect * Throw Exception only in task threads * Only retry the failed map tasks * Fix unsafe shuffle writer (oap-project#39) Part of oap-project#37 When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist. Added unit tests to ensure the correctness. * Add UTs for RemoteSorter (oap-project#40) Ensure RemoteSorter correctness. * Shuffle read metrics update even after cleaning up (oap-project#42) * Shuffle read metrics update even after cleaning up * Style * Not overidding Spark source code for better compatibility (oap-project#44) * Not overidding Spark source code for better compatibility * Fix: RpcEnv is not set in Executor * Test fix * Implement close * Catch and log Exception during RemoteShuffleTransferService's closing * Remove benchmarker * Remove the logis that will never go through under the customized TransferService, throw Exception in those branches * Set numCores using reflection, get from Dispatcher * Move package * Adding back benchmark * Style and comments * Remove reflection, let a config determine threads number for new transfer service * Not reading hdfs-site.xml when storage is DAOS * Move repository * Move repository Co-authored-by: Shuaiqi Ge <[email protected]> * Integrate remote-shuffle in CI & more docs (oap-project#1167) * CI * Remove subdir travis * Docs * More docs * Separate travis tests to different stages * Fix * Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172) * [remote-shuffle]Refactor (oap-project#1206) * Refactor * Docs * [remote-shuffle]Add docs for performance evaluation tool (#1233) * Allow producing a test jar with dependencies, refactor * Support -h help * Add docs * Disable hash-based shuffle writer by default (#1239) * Reuse file handle in reduce stage (#1234) * Remove perf evaluation tool * Update: scheduler in Spark 3.0 * Basic update for Spark3.0, updated ShuffleManager and related codes * Upper level batch fetch support, full custom metrics support(by Reynold) * Update readme * Modify Travis, empty install * Modify docs Co-authored-by: Shuaiqi Ge <[email protected]>
- Loading branch information