From f7915beba8e636cb0d3a96b15f4c944b5d835fed Mon Sep 17 00:00:00 2001 From: yao531441 <325067108@qq.com> Date: Thu, 30 Apr 2020 09:18:00 +0800 Subject: [PATCH] Branch 0.8 spark 2.4.x (#1245) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [OAP]Update pom.xml to add module oap-shuffle * [OAP]Add a script to build OAP *[oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217) * add oap-perf-suite for OAP branch-0.8 * Clean oap file format codes *[OAP-COMMON] rename common package (#1218) * rename common package * change common version to oap version *[OAP-CACHE] Backport 0.7 to 0.8 on April 20th (#1221) * update MemoryManager to cover unified memorymanager when cache is mix (#1204) * update MemoryManager to cover unified memorymanager when cache is mix * Verify correctness OAP cache/memorymanager conf settings, update user doc accordingly * use assertThrows in unit test * [oap-cache/oap]Modified test case configs of oap-perf-suite (#1181) * Add test case configs to oap-perf-suite * Remove unnecessary test cases in oap-perf-suite * remove memory manager and cache stategy configs from oap-perf-suite * delete unnecessary changes * modified unnecessary changes * Add test case for sharing offheap/pm memorymanager in separate cache. * remove compression test case * Corrected the configs when ORC cache enabled * modified test case for only DRAM cache medium * modified blank lines * Clean oap file format related codes * Reorganize original oap to oap-cache/oap folder * Create the top level project structure * Fix a typo * organize for oap 0.8 for 2.4.x * Add RemoteShuffle codebase to OAP (#1156) * Initial commit * Add pom * Update ignore * Add basic components for remote shuffle writing * Add ExternalSorter for writing to HDFS * Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW * Update ShuffleResolver to write index file and commit * Spill to remote storage * Add RemoteExternalSorter test suite * Test RemoteExternalSorter writer to HDFS * Write as .index, .data * Fix minor bugs * Add tests for RemoteShuffleBlockResolver * General remote shuffle reader * Test getBlockData in Resolver * Test HadoopFileSegmentManagedBuffer * Refactor Resolver and test suite * Fix: check existence first * Test actual reading iterator * Fix appId early getting, add basic RDD shuffle operation test * Fix bug in the condition of empty mapoutput data file, add tests to ensure this * Introduce classes for optimized shuffle writing * Optimized shuffle writer path & tests * Optimized path configurable and refactor * Introduce BypassMergeSortShuffleWriter * Implement bypass mergesort path & tests * Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf * Introduce RemoteAggregator and related classes, refactor RemoteSorter * Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap * Fix: No closing after coping streams * Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter * Fix test suite, test shuffle reader should read by block * Avoid overriding Spark classes to make default shuffle manager still work, and other refactors * Fix wrong importing, make more classes not override Spark code * Make storage master and root directory configurable * Properly get appId while running on distributed env * Lazy evaluation for getting SparkEnv vars * Add a remote bypass-merge threshold conf * Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default * Fix * Use SortShuffle's block iterator framework including shuffle blocks pre-fetch * Not loading any default config from files, and more objects reuse * Make replica configurable * Rename to ShuffleRemoteSorter * Fix: use RemoteSorter instead of ExternalSorter * Introduce DAGScheduler * With executors lost, no need to rerun map tasks thanks to remote shuffle * Require remote shuffle and external shuffle service not be enabled at the same time * When index cache enabled, fetch index files from executors who wrote them * Read index from Guava cache * UT doesn't rely on external systems * Add travis support * add code for read/write metrics (#5) * update read/write metrics * write/read metrics 功能添加完毕 * Delete compile.sh * metrics pr * metrics pr * add code about read/write metrics * add codes about shuffle read/write * add codes about shuffle read/write * remove work file * Fix wrong offset and length (#6) * Fix NettyBlockRpcServer: only cast type when remote shuffle enabled * Add micro-benchmark for shuffle writers/reader (#3) * Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces * Update travis * Fix * Add other 2 writers' benchmark * Add reader micro-benchmark * Multiple stages in Travis to avoid timeout * Post benchmark results as PR comments * Fix * Debug * Debug * Fix * Beautify * Minor fix * Some renames for better understanding * Style * spark reads hadoop conf remotely (#8) Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request. By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely. * Docs (#19) Add configuration and tuning guides. * remove remain/release in RemoteShuffleBlockIterator (#23) The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations. * Read DAOS conf from local * check style when compiling (#24) Add scala style check * Remove extra input stream layer, not needed because no buffer releasing (#25) Extra layer brings overhead. * All skip -> seek * More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path * Data file asynchronous pre-fetch from multiple sources (#30) This PR resolves #16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight. In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first. Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads) More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset. * Refactor & style * Put index information in cache in map stage to avoid loading from storage in reduce stage (#33) * Put index info in cache in map stage if index cache is enabled * Refactor * Fix * Fix: Failing to fetch remote HDFS configurations should not crash the app (#36) Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed. * Add corruption detect (#34) * Add corruption detect * Throw Exception only in task threads * Only retry the failed map tasks * Fix unsafe shuffle writer (#39) Part of #37 When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist. Added unit tests to ensure the correctness. * Add UTs for RemoteSorter (#40) Ensure RemoteSorter correctness. * Shuffle read metrics update even after cleaning up (#42) * Shuffle read metrics update even after cleaning up * Style * Not overidding Spark source code for better compatibility (#44) * Not overidding Spark source code for better compatibility * Fix: RpcEnv is not set in Executor * Test fix * Implement close * Catch and log Exception during RemoteShuffleTransferService's closing * Remove benchmarker * Remove the logis that will never go through under the customized TransferService, throw Exception in those branches * Set numCores using reflection, get from Dispatcher * Move package * Adding back benchmark * Style and comments * Remove reflection, let a config determine threads number for new transfer service * Not reading hdfs-site.xml when storage is DAOS * Move repository * Move repository Co-authored-by: Shuaiqi Ge <35885772+BestOreo@users.noreply.github.com> * move .travis.yml to roor Dir * Use matrix in travis-ci Use matrix in travis-ci, to support multiple modules * Integrate remote-shuffle in CI & more docs (#1167) * CI * Remove subdir travis * Docs * More docs * Separate travis tests to different stages * Fix * external cache-plasma cache support (#1200) * external cache support, resolve conflict resolve conflict get fail will throw exception add metrics catch DuplicateObjectException and fix used memory modify by comments fix * bug fix * bug fix * modify according to the comments * modify to pass CI Co-authored-by: Ji Kunshang Co-authored-by: dingyu Co-authored-by: offthewall123 * Abstract native code and jni interface to a separate module (#1207) * abstract native code and jni interface to a separate module * add a parent pom.xml * revert docs * code style * rename oap common * move properties to root pom * change version to 0.8.0 * rename to com.intel.oap (#1213) * add plasma user doc (#1215) Co-authored-by: offthewall123 * [oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217) * add oap-perf-suite for OAP branch-0.8 * Clean oap file format codes * [OAP-COMMON] rename common package (#1218) * rename common package * change common version to oap version Co-authored-by: Yan Ma Co-authored-by: Hong Co-authored-by: Haifeng Chen Co-authored-by: Guo Chenzhao Co-authored-by: Shuaiqi Ge <35885772+BestOreo@users.noreply.github.com> Co-authored-by: zhixingheyi-tian <1564481943@qq.com> Co-authored-by: zhixingheyi-tian <41657774+zhixingheyi-tian@users.noreply.github.com> Co-authored-by: offthewall123 <302531263@qq.com> Co-authored-by: Ji Kunshang Co-authored-by: dingyu Co-authored-by: offthewall123 Co-authored-by: yeyuqiang Introduce new performance evaluation tool and deprecate the old micro-benchmark (#1172) Fix unsatisfiedLinkError (#1222) [OAP-CACHE] Support DAX KMEM mode (#1210) * Support DAX KMEM mode * move unit test to MemoryManagerConfigSuite. * code style & enhance the numa logic * Update docs and annotation * delete MemoryManagerConfigSuite.scala which has rebased by other PR #1217 * typo issue * update doc [OAP]Rename oap * [OAP]Add a README about the folder dev * [OAP]Add plasma into make-distribution.sh * [OAP]Update make-distribution.sh to add build arrow-java Co-authored-by: Yao,Qing Co-authored-by: kellyzly --- dev/make-distribution.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 4ba3993de..07f209f48 100644 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -160,6 +160,8 @@ function prepare_intel_arrow() { cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g -O3" -DCMAKE_CXX_FLAGS="-g -O3" -DARROW_BUILD_TESTS=on -DARROW_PLASMA_JAVA_CLIENT=on -DARROW_PLASMA=on -DARROW_DEPENDENCY_SOURCE=BUNDLED .. make -j$(nproc) make install -j$(nproc) + cd $dev_path/thirdparty/arrow/java + mvn clean -q -DskipTests install }