Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-1]fix columnar wscg on empty recordbatch #36

Merged
merged 1 commit into from
Jan 14, 2021

Conversation

zhouyuan
Copy link
Collaborator

Fixes: #1
Signed-off-by: Yuan Zhou [email protected]

@github-actions
Copy link

@zhouyuan zhouyuan merged commit 8f01e2e into oap-project:master Jan 14, 2021
HongW2019 added a commit to HongW2019/gazelle_plugin that referenced this pull request Sep 2, 2021
* Add Cluster Performance TestSuite -- oap-perf-suite to OAP (oap-project#1155)

* add oap-perf-suite

* Update OapBenchmarkDataBuilder.scala

* Update OapBenchmarkDataBuilder.scala

* delete the data-generating part in oap-perf-suite

* modified pom.xml

* update data defaultProperties

* index/data cache seperation with different cache backend (oap-project#1159)

* index/data cache seperation with different cache backend

* refact with a mix cache instead of too many branchs in FiberCacheManager

* No need to differentiate index/data cache in FiberCacheManagerSuite.

* share memory manager with different cachebackend instance

* rename some parameters

Co-authored-by: knightyyq <[email protected]>

* Updated docs for branch-0.7-spark-2.4.x (oap-project#1177)

* Update OAP-User-Guide.md

* Add BinaryCache part in doc

* Updates to top-level README, Architecture Overview, and manual test. (oap-project#1180)

Signed-off-by: Kevin Putnam <[email protected]>

* update user guide organization on cache strategies part (oap-project#1169) (oap-project#1175)

* intialize memorymanager based on cache strategy (oap-project#1179)

* intialize memorymanager based on cache strategy

* correct ut name to fibercachesuite

* add unit tests for memorymanager initializing from cache strategy

* fix cache guardian use too much memory (oap-project#1174)

* fix cache guardian use too much memory

* make retry time configurable

* rename

* fix by comments

* Vmemcache doesn't support fiber cache compression (oap-project#1195)

* vmemcache doesn't support fiber cache compression, will throw an Exception(Issue 1186)

* using fiber cache compression will throw exception when initialize vmemcache

* update unit test for memory manager config options (oap-project#1197)

* update unit test for memory manager config options

* add more ut for memorymanage config options

* Update OAP-User-Guide.md on cache separation (oap-project#1198)

* Reorganize original oap to oap-cache/oap folder

* organize for oap 0.8 for 2.4.x

* Add RemoteShuffle codebase to OAP (oap-project#1156)

* Initial commit

* Add pom

* Update ignore

* Add basic components for remote shuffle writing

* Add ExternalSorter for writing to HDFS

* Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW

* Update ShuffleResolver to write index file and commit

* Spill to remote storage

* Add RemoteExternalSorter test suite

* Test RemoteExternalSorter writer to HDFS

* Write as .index, .data

* Fix minor bugs

* Add tests for RemoteShuffleBlockResolver

* General remote shuffle reader

* Test getBlockData in Resolver

* Test HadoopFileSegmentManagedBuffer

* Refactor Resolver and test suite

* Fix: check existence first

* Test actual reading iterator

* Fix appId early getting, add basic RDD shuffle operation test

* Fix bug in the condition of empty mapoutput data file, add tests to ensure this

* Introduce classes for optimized shuffle writing

* Optimized shuffle writer path & tests

* Optimized path configurable and refactor

* Introduce BypassMergeSortShuffleWriter

* Implement bypass mergesort path & tests

* Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf

* Introduce RemoteAggregator and related classes, refactor RemoteSorter

* Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap

* Fix: No closing after coping streams

* Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter

* Fix test suite, test shuffle reader should read by block

* Avoid overriding Spark classes to make default shuffle manager still work, and other refactors

* Fix wrong importing, make more classes not override Spark code

* Make storage master and root directory configurable

* Properly get appId while running on distributed env

* Lazy evaluation for getting SparkEnv vars

* Add a remote bypass-merge threshold conf

* Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default

* Fix

* Use SortShuffle's block iterator framework including shuffle blocks pre-fetch

* Not loading any default config from files, and more objects reuse

* Make replica configurable

* Rename to ShuffleRemoteSorter

* Fix: use RemoteSorter instead of ExternalSorter

* Introduce DAGScheduler

* With executors lost, no need to rerun map tasks thanks to remote shuffle

* Require remote shuffle and external shuffle service not be enabled at the same time

* When index cache enabled, fetch index files from executors who wrote them

* Read index from Guava cache

* UT doesn't rely on external systems

* Add travis support

* add code for read/write metrics (oap-project#5)

* update read/write metrics

* write/read metrics 功能添加完毕

* Delete compile.sh

* metrics pr

* metrics pr

* add code about read/write metrics

* add codes about shuffle read/write

* add codes about shuffle read/write

* remove work file

* Fix wrong offset and length (oap-project#6)

* Fix NettyBlockRpcServer: only cast type when remote shuffle enabled

* Add micro-benchmark for shuffle writers/reader (oap-project#3)

* Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces

* Update travis

* Fix

* Add other 2 writers' benchmark

* Add reader micro-benchmark

* Multiple stages in Travis to avoid timeout

* Post benchmark results as PR comments

* Fix

* Debug

* Debug

* Fix

* Beautify

* Minor fix

* Some renames for better understanding

* Style

* spark reads hadoop conf remotely (oap-project#8)

### What changes were proposed in this pull request?
Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request.
### How was this patch tested?
By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely.

* Docs (oap-project#19)

Add configuration and tuning guides.

* remove remain/release in RemoteShuffleBlockIterator (oap-project#23)

The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release
should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations.

* Read DAOS conf from local

* check style when compiling (oap-project#24)

Add scala style check

* Remove extra input stream layer, not needed because no buffer releasing (oap-project#25)

Extra layer brings overhead.

* All skip -> seek

* More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path

* Data file asynchronous pre-fetch from multiple sources (oap-project#30)

   This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight.

    In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first.

    Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads)

    More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset.

* Refactor & style

* Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33)

* Put index info in cache in map stage if index cache is enabled

* Refactor

* Fix

* Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36)

Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed.

* Add corruption detect (oap-project#34)

* Add corruption detect

* Throw Exception only in task threads

* Only retry the failed map tasks

* Fix unsafe shuffle writer (oap-project#39)

Part of oap-project#37

When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist.

Added unit tests to ensure the correctness.

* Add UTs for RemoteSorter (oap-project#40)

Ensure RemoteSorter correctness.

* Shuffle read metrics update even after cleaning up (oap-project#42)

* Shuffle read metrics update even after cleaning up

* Style

* Not overidding Spark source code for better compatibility (oap-project#44)

* Not overidding Spark source code for better compatibility

* Fix: RpcEnv is not set in Executor

* Test fix

* Implement close

* Catch and log Exception during RemoteShuffleTransferService's closing

* Remove benchmarker

* Remove the logis that will never go through under the customized TransferService, throw Exception in those branches

* Set numCores using reflection, get from Dispatcher

* Move package

* Adding back benchmark

* Style and comments

* Remove reflection, let a config determine threads number for new transfer service

* Not reading hdfs-site.xml when storage is DAOS

* Move repository

* Move repository

Co-authored-by: Shuaiqi Ge <[email protected]>

* Use matrix in travis-ci

Use matrix in travis-ci, to support multiple modules

* Integrate remote-shuffle in CI & more docs (oap-project#1167)

* CI

* Remove subdir travis

* Docs

* More docs

* Separate travis tests to different stages

* Fix

* external cache-plasma cache support (oap-project#1200)

* external cache support, resolve conflict

resolve conflict

get fail will throw exception

add metrics

catch DuplicateObjectException and fix used memory

modify by comments

fix

* bug fix

* bug fix

* modify according to the comments

* modify to pass CI

Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>

* Abstract native code and jni interface to a separate module (oap-project#1207)

* abstract native code and jni interface to a separate module

* add a parent pom.xml

* revert docs

* code style

* rename oap common

* move properties to root pom

* change version to 0.8.0

* rename to com.intel.oap (oap-project#1213)

* add plasma user doc (oap-project#1215)

Co-authored-by: offthewall123 <[email protected]>

* [oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

* [OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

* [OAP-CACHE] Backport 0.7 to 0.8 on April 20th (#1221)

* update MemoryManager to cover unified memorymanager when cache is mix (oap-project#1204)

* update MemoryManager to cover unified memorymanager when cache is mix

* Verify correctness OAP cache/memorymanager conf settings, update user
doc accordingly

* use assertThrows in unit test

* [oap-cache/oap]Modified test case configs of oap-perf-suite (oap-project#1181)

* Add test case configs to oap-perf-suite

* Remove unnecessary test cases in oap-perf-suite

* remove memory manager and cache stategy configs from oap-perf-suite

* delete unnecessary changes

* modified unnecessary changes

* Add test case for sharing offheap/pm memorymanager in separate cache.

* remove compression test case

* Corrected the configs when ORC cache enabled

* modified test case for only DRAM cache medium

* modified blank lines

* Clean oap file format related codes

* Reorganize original oap to oap-cache/oap folder

* Create the top level project structure

* Fix a typo

* organize for oap 0.8 for 2.4.x

* Add RemoteShuffle codebase to OAP (oap-project#1156)

* Initial commit

* Add pom

* Update ignore

* Add basic components for remote shuffle writing

* Add ExternalSorter for writing to HDFS

* Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW

* Update ShuffleResolver to write index file and commit

* Spill to remote storage

* Add RemoteExternalSorter test suite

* Test RemoteExternalSorter writer to HDFS

* Write as .index, .data

* Fix minor bugs

* Add tests for RemoteShuffleBlockResolver

* General remote shuffle reader

* Test getBlockData in Resolver

* Test HadoopFileSegmentManagedBuffer

* Refactor Resolver and test suite

* Fix: check existence first

* Test actual reading iterator

* Fix appId early getting, add basic RDD shuffle operation test

* Fix bug in the condition of empty mapoutput data file, add tests to ensure this

* Introduce classes for optimized shuffle writing

* Optimized shuffle writer path & tests

* Optimized path configurable and refactor

* Introduce BypassMergeSortShuffleWriter

* Implement bypass mergesort path & tests

* Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf

* Introduce RemoteAggregator and related classes, refactor RemoteSorter

* Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap

* Fix: No closing after coping streams

* Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter

* Fix test suite, test shuffle reader should read by block

* Avoid overriding Spark classes to make default shuffle manager still work, and other refactors

* Fix wrong importing, make more classes not override Spark code

* Make storage master and root directory configurable

* Properly get appId while running on distributed env

* Lazy evaluation for getting SparkEnv vars

* Add a remote bypass-merge threshold conf

* Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default

* Fix

* Use SortShuffle's block iterator framework including shuffle blocks pre-fetch

* Not loading any default config from files, and more objects reuse

* Make replica configurable

* Rename to ShuffleRemoteSorter

* Fix: use RemoteSorter instead of ExternalSorter

* Introduce DAGScheduler

* With executors lost, no need to rerun map tasks thanks to remote shuffle

* Require remote shuffle and external shuffle service not be enabled at the same time

* When index cache enabled, fetch index files from executors who wrote them

* Read index from Guava cache

* UT doesn't rely on external systems

* Add travis support

* add code for read/write metrics (oap-project#5)

* update read/write metrics

* write/read metrics 功能添加完毕

* Delete compile.sh

* metrics pr

* metrics pr

* add code about read/write metrics

* add codes about shuffle read/write

* add codes about shuffle read/write

* remove work file

* Fix wrong offset and length (oap-project#6)

* Fix NettyBlockRpcServer: only cast type when remote shuffle enabled

* Add micro-benchmark for shuffle writers/reader (oap-project#3)

* Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces

* Update travis

* Fix

* Add other 2 writers' benchmark

* Add reader micro-benchmark

* Multiple stages in Travis to avoid timeout

* Post benchmark results as PR comments

* Fix

* Debug

* Debug

* Fix

* Beautify

* Minor fix

* Some renames for better understanding

* Style

* spark reads hadoop conf remotely (oap-project#8)

### What changes were proposed in this pull request?
Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request.
### How was this patch tested?
By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely.

* Docs (oap-project#19)

Add configuration and tuning guides.

* remove remain/release in RemoteShuffleBlockIterator (oap-project#23)

The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release
should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations.

* Read DAOS conf from local

* check style when compiling (oap-project#24)

Add scala style check

* Remove extra input stream layer, not needed because no buffer releasing (oap-project#25)

Extra layer brings overhead.

* All skip -> seek

* More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path

* Data file asynchronous pre-fetch from multiple sources (oap-project#30)

   This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight.

    In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first.

    Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads)

    More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset.

* Refactor & style

* Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33)

* Put index info in cache in map stage if index cache is enabled

* Refactor

* Fix

* Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36)

Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed.

* Add corruption detect (oap-project#34)

* Add corruption detect

* Throw Exception only in task threads

* Only retry the failed map tasks

* Fix unsafe shuffle writer (oap-project#39)

Part of oap-project#37

When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist.

Added unit tests to ensure the correctness.

* Add UTs for RemoteSorter (oap-project#40)

Ensure RemoteSorter correctness.

* Shuffle read metrics update even after cleaning up (oap-project#42)

* Shuffle read metrics update even after cleaning up

* Style

* Not overidding Spark source code for better compatibility (oap-project#44)

* Not overidding Spark source code for better compatibility

* Fix: RpcEnv is not set in Executor

* Test fix

* Implement close

* Catch and log Exception during RemoteShuffleTransferService's closing

* Remove benchmarker

* Remove the logis that will never go through under the customized TransferService, throw Exception in those branches

* Set numCores using reflection, get from Dispatcher

* Move package

* Adding back benchmark

* Style and comments

* Remove reflection, let a config determine threads number for new transfer service

* Not reading hdfs-site.xml when storage is DAOS

* Move repository

* Move repository

Co-authored-by: Shuaiqi Ge <[email protected]>

* move .travis.yml to roor Dir

* Use matrix in travis-ci

Use matrix in travis-ci, to support multiple modules

* Integrate remote-shuffle in CI & more docs (oap-project#1167)

* CI

* Remove subdir travis

* Docs

* More docs

* Separate travis tests to different stages

* Fix

* external cache-plasma cache support (oap-project#1200)

* external cache support, resolve conflict

resolve conflict

get fail will throw exception

add metrics

catch DuplicateObjectException and fix used memory

modify by comments

fix

* bug fix

* bug fix

* modify according to the comments

* modify to pass CI

Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>

* Abstract native code and jni interface to a separate module (oap-project#1207)

* abstract native code and jni interface to a separate module

* add a parent pom.xml

* revert docs

* code style

* rename oap common

* move properties to root pom

* change version to 0.8.0

* rename to com.intel.oap (oap-project#1213)

* add plasma user doc (oap-project#1215)

Co-authored-by: offthewall123 <[email protected]>

* [oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

* [OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

Co-authored-by: Yan Ma <[email protected]>
Co-authored-by: Hong <[email protected]>
Co-authored-by: Haifeng Chen <[email protected]>
Co-authored-by: Guo Chenzhao <[email protected]>
Co-authored-by: Shuaiqi Ge <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: yeyuqiang <[email protected]>

* Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172)

* Fix unsatisfiedLinkError (#1222)

* [OAP-CACHE] Support DAX KMEM mode (oap-project#1210)

* Support DAX KMEM mode

* move unit test to MemoryManagerConfigSuite.

* code style & enhance the numa logic

* Update docs and annotation

* delete MemoryManagerConfigSuite.scala which has rebased by other PR #1217

* typo issue

* update doc

* [remote-shuffle]Refactor (oap-project#1206)

* Refactor

* Docs

* [OAP]Add a script to build OAP  (#1224)

* [OAP]Update pom.xml to add module oap-shuffle

* [OAP]Add a script to build OAP

*[oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

*[OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

*[OAP-CACHE] Backport 0.7 to 0.8 on April 20th (#1221)

* update MemoryManager to cover unified memorymanager when cache is mix (oap-project#1204)

* update MemoryManager to cover unified memorymanager when cache is mix

* Verify correctness OAP cache/memorymanager conf settings, update user
doc accordingly

* use assertThrows in unit test

* [oap-cache/oap]Modified test case configs of oap-perf-suite (oap-project#1181)

* Add test case configs to oap-perf-suite

* Remove unnecessary test cases in oap-perf-suite

* remove memory manager and cache stategy configs from oap-perf-suite

* delete unnecessary changes

* modified unnecessary changes

* Add test case for sharing offheap/pm memorymanager in separate cache.

* remove compression test case

* Corrected the configs when ORC cache enabled

* modified test case for only DRAM cache medium

* modified blank lines

* Clean oap file format related codes

* Reorganize original oap to oap-cache/oap folder

* Create the top level project structure

* Fix a typo

* organize for oap 0.8 for 2.4.x

* Add RemoteShuffle codebase to OAP (oap-project#1156)

* Initial commit

* Add pom

* Update ignore

* Add basic components for remote shuffle writing

* Add ExternalSorter for writing to HDFS

* Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW

* Update ShuffleResolver to write index file and commit

* Spill to remote storage

* Add RemoteExternalSorter test suite

* Test RemoteExternalSorter writer to HDFS

* Write as .index, .data

* Fix minor bugs

* Add tests for RemoteShuffleBlockResolver

* General remote shuffle reader

* Test getBlockData in Resolver

* Test HadoopFileSegmentManagedBuffer

* Refactor Resolver and test suite

* Fix: check existence first

* Test actual reading iterator

* Fix appId early getting, add basic RDD shuffle operation test

* Fix bug in the condition of empty mapoutput data file, add tests to ensure this

* Introduce classes for optimized shuffle writing

* Optimized shuffle writer path & tests

* Optimized path configurable and refactor

* Introduce BypassMergeSortShuffleWriter

* Implement bypass mergesort path & tests

* Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf

* Introduce RemoteAggregator and related classes, refactor RemoteSorter

* Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap

* Fix: No closing after coping streams

* Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter

* Fix test suite, test shuffle reader should read by block

* Avoid overriding Spark classes to make default shuffle manager still work, and other refactors

* Fix wrong importing, make more classes not override Spark code

* Make storage master and root directory configurable

* Properly get appId while running on distributed env

* Lazy evaluation for getting SparkEnv vars

* Add a remote bypass-merge threshold conf

* Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default

* Fix

* Use SortShuffle's block iterator framework including shuffle blocks pre-fetch

* Not loading any default config from files, and more objects reuse

* Make replica configurable

* Rename to ShuffleRemoteSorter

* Fix: use RemoteSorter instead of ExternalSorter

* Introduce DAGScheduler

* With executors lost, no need to rerun map tasks thanks to remote shuffle

* Require remote shuffle and external shuffle service not be enabled at the same time

* When index cache enabled, fetch index files from executors who wrote them

* Read index from Guava cache

* UT doesn't rely on external systems

* Add travis support

* add code for read/write metrics (oap-project#5)

* update read/write metrics

* write/read metrics 功能添加完毕

* Delete compile.sh

* metrics pr

* metrics pr

* add code about read/write metrics

* add codes about shuffle read/write

* add codes about shuffle read/write

* remove work file

* Fix wrong offset and length (oap-project#6)

* Fix NettyBlockRpcServer: only cast type when remote shuffle enabled

* Add micro-benchmark for shuffle writers/reader (oap-project#3)

* Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces

* Update travis

* Fix

* Add other 2 writers' benchmark

* Add reader micro-benchmark

* Multiple stages in Travis to avoid timeout

* Post benchmark results as PR comments

* Fix

* Debug

* Debug

* Fix

* Beautify

* Minor fix

* Some renames for better understanding

* Style

* spark reads hadoop conf remotely (oap-project#8)

Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request.
By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely.

* Docs (oap-project#19)

Add configuration and tuning guides.

* remove remain/release in RemoteShuffleBlockIterator (oap-project#23)

The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release
should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations.

* Read DAOS conf from local

* check style when compiling (oap-project#24)

Add scala style check

* Remove extra input stream layer, not needed because no buffer releasing (oap-project#25)

Extra layer brings overhead.

* All skip -> seek

* More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path

* Data file asynchronous pre-fetch from multiple sources (oap-project#30)

   This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight.

    In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first.

    Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads)

    More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset.

* Refactor & style

* Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33)

* Put index info in cache in map stage if index cache is enabled

* Refactor

* Fix

* Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36)

Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed.

* Add corruption detect (oap-project#34)

* Add corruption detect

* Throw Exception only in task threads

* Only retry the failed map tasks

* Fix unsafe shuffle writer (oap-project#39)

Part of oap-project#37

When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist.

Added unit tests to ensure the correctness.

* Add UTs for RemoteSorter (oap-project#40)

Ensure RemoteSorter correctness.

* Shuffle read metrics update even after cleaning up (oap-project#42)

* Shuffle read metrics update even after cleaning up

* Style

* Not overidding Spark source code for better compatibility (oap-project#44)

* Not overidding Spark source code for better compatibility

* Fix: RpcEnv is not set in Executor

* Test fix

* Implement close

* Catch and log Exception during RemoteShuffleTransferService's closing

* Remove benchmarker

* Remove the logis that will never go through under the customized TransferService, throw Exception in those branches

* Set numCores using reflection, get from Dispatcher

* Move package

* Adding back benchmark

* Style and comments

* Remove reflection, let a config determine threads number for new transfer service

* Not reading hdfs-site.xml when storage is DAOS

* Move repository

* Move repository

Co-authored-by: Shuaiqi Ge <[email protected]>

* move .travis.yml to roor Dir

* Use matrix in travis-ci

Use matrix in travis-ci, to support multiple modules

* Integrate remote-shuffle in CI & more docs (oap-project#1167)

* CI

* Remove subdir travis

* Docs

* More docs

* Separate travis tests to different stages

* Fix

* external cache-plasma cache support (oap-project#1200)

* external cache support, resolve conflict

resolve conflict

get fail will throw exception

add metrics

catch DuplicateObjectException and fix used memory

modify by comments

fix

* bug fix

* bug fix

* modify according to the comments

* modify to pass CI

Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>

* Abstract native code and jni interface to a separate module (oap-project#1207)

* abstract native code and jni interface to a separate module

* add a parent pom.xml

* revert docs

* code style

* rename oap common

* move properties to root pom

* change version to 0.8.0

* rename to com.intel.oap (oap-project#1213)

* add plasma user doc (oap-project#1215)

Co-authored-by: offthewall123 <[email protected]>

* [oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

* [OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

Co-authored-by: Yan Ma <[email protected]>
Co-authored-by: Hong <[email protected]>
Co-authored-by: Haifeng Chen <[email protected]>
Co-authored-by: Guo Chenzhao <[email protected]>
Co-authored-by: Shuaiqi Ge <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: yeyuqiang <[email protected]>

Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172)

Fix unsatisfiedLinkError (#1222)

[OAP-CACHE] Support DAX KMEM mode (oap-project#1210)

* Support DAX KMEM mode

* move unit test to MemoryManagerConfigSuite.

* code style & enhance the numa logic

* Update docs and annotation

* delete MemoryManagerConfigSuite.scala which has rebased by other PR #1217

* typo issue

* update doc

[OAP]Rename oap

* [OAP]Add a README about the folder dev

* [OAP]Add plasma into make-distribution.sh

Co-authored-by: Yao,Qing <[email protected]>
Co-authored-by: kellyzly <[email protected]>

* [OAP-CACHE] Update travis v2 branch 0.8 spark 2.4.x -test (#1219)

* update .travis.yml-v1

* update .travis.yml-v2 and trigger travis

* update .travis.yml-v3

* update .travis.yml-v4

* update .travis.yml-v5

* update .travis.yml-v6

* update .travis.yml-v7

* update .travis.yml-v8

* update .travis.yml-v9

* update .travis.yml-v10

* update .travis.yml-v17

* update .travis.yml-v18

* update .travis.yml-v19 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v20 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v21 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v22 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v23 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v24 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v25 and use plasma-1.0.0-SNAPSHOT

* update .travis.yml-v26 and use plasma-1.0.0-SNAPSHOT modify plasmaClient api name

* update .travis.yml-v36
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect

* update .travis.yml-v37
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage

* update .travis.yml-v38
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage

* update .travis.yml-v39
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO

* update .travis.yml-v40
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache

* update .travis.yml-v40
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache
quite mvn log

* update .travis.yml-v42
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache
quite mvn log
define install phase in remote shuffle job

* update .travis.yml-v43
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache
quite mvn log
define install phase in remote shuffle job
put install arrow script to dev/install_arrow.sh

* make install_arrow.sh executable

* update .travis.yml-v45
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache
quite mvn log
define install phase in remote shuffle job
put install arrow script to dev/install_arrow.sh

* update .travis.yml-v46
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
log redirect
define stage
remove comments and add TODO
build with vmemcache
quite mvn log
define install phase in remote shuffle job
put install arrow script to dev/install_arrow.sh

* update .travis.yml-v47
use plasma-1.0.0-SNAPSHOT
modify plasmaClient api name
define stage
remove comments and add TODO
build with vmemcache
quite mvn log
define install phase in remote shuffle job
put install arrow script to dev/install_arrow.sh

* update .travis.yml-v48
put oap-shuffle and oap-cache in one stage

* make install_memkind.sh install_vmemcache.sh executable

* update .travis.yml-v49

* make install_memkind.sh install_vmemcache.sh executable

* update .travis.yml-v50

* update .travis.yml-v51

* update .travis.yml-v52

* update .travis.yml-v53
modify according to comments
https://github.com/Intel-bigdata/OAP/pull/1219#discussion_r414232007
https://github.com/Intel-bigdata/OAP/pull/1219#discussion_r414234161

* update .travis.yml-v54
Test no install step in remote-shuffle job

* update .travis.yml-v55
add install step in remote-shuffle job

* update .travis.yml-v56

* update .travis.yml-v57
use Intel-bigdata/arrow

* update .travis.yml-v58
without install step in remote-shuffle job

* update .travis.yml-v59
define an empty install step in remote-shuffle job

* update .travis.yml-v60
add commments

Co-authored-by: offthewall123 <[email protected]>

* [remote-shuffle]Add docs for performance evaluation tool (#1233)

* Allow producing a test jar with dependencies, refactor

* Support -h help

* Add docs

* Disable hash-based shuffle writer by default (#1239)

* Reuse file handle in reduce stage (#1234)

* [OAP-CACHE-OAP] Rename nonevict to noevict branch 0.8 (#1241)

* resolve conflict

* resolve conflict

Co-authored-by: offthewall123 <[email protected]>

* Ignore idea files

* Revert "organize for oap 0.8 for 2.4.x"

This reverts commit 2f83a600639a34c0e433739d2e4d2a2b45a47913.

* Revert "Reuse file handle in reduce stage (#1234)"

This reverts commit 4a3886dd332bc4346d9618127ac7d5303b746d5b.

* Revert "Disable hash-based shuffle writer by default (#1239)"

This reverts commit 7ebc911a6e19bac1830f9e9363c2e8efd3eae8c6.

* Revert "[remote-shuffle]Add docs for performance evaluation tool (#1233)"

This reverts commit cb5ce5f54c925fc113234bd613244ba97adb0c1f.

* Revert "[remote-shuffle]Refactor (oap-project#1206)"

This reverts commit ad8f3770d214004275a144e6dcb0f3f738efcb49.

* Revert "Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172)"

This reverts commit 7059ac1cc5802c6930d1419e55b511cdafdbfb6f.

* Revert "Integrate remote-shuffle in CI & more docs (oap-project#1167)"

This reverts commit 7ffe3cfe26274968fccc39bd2f1ec46bb49f81d9.

* Revert "Add RemoteShuffle codebase to OAP (oap-project#1156)"

This reverts commit 622b7ee43489dcb5a006e1841f396f0dcd3f5353.

* Remove Remote shuffle POM

* Remove remote shuffle travis

Co-authored-by: Hong <[email protected]>
Co-authored-by: yeyuqiang <[email protected]>
Co-authored-by: knightyyq <[email protected]>
Co-authored-by: Kevin Putnam <[email protected]>
Co-authored-by: Yan Ma <[email protected]>
Co-authored-by: jikunshang <[email protected]>
Co-authored-by: Haifeng Chen <[email protected]>
Co-authored-by: Guo Chenzhao <[email protected]>
Co-authored-by: Shuaiqi Ge <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: yao531441 <[email protected]>
Co-authored-by: Yao,Qing <[email protected]>
Co-authored-by: kellyzly <[email protected]>
HongW2019 pushed a commit to HongW2019/gazelle_plugin that referenced this pull request Sep 2, 2021
* [OAP]Update pom.xml to add module oap-shuffle

* [OAP]Add a script to build OAP

*[oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

*[OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

*[OAP-CACHE] Backport 0.7 to 0.8 on April 20th (#1221)

* update MemoryManager to cover unified memorymanager when cache is mix (oap-project#1204)

* update MemoryManager to cover unified memorymanager when cache is mix

* Verify correctness OAP cache/memorymanager conf settings, update user
doc accordingly

* use assertThrows in unit test

* [oap-cache/oap]Modified test case configs of oap-perf-suite (oap-project#1181)

* Add test case configs to oap-perf-suite

* Remove unnecessary test cases in oap-perf-suite

* remove memory manager and cache stategy configs from oap-perf-suite

* delete unnecessary changes

* modified unnecessary changes

* Add test case for sharing offheap/pm memorymanager in separate cache.

* remove compression test case

* Corrected the configs when ORC cache enabled

* modified test case for only DRAM cache medium

* modified blank lines

* Clean oap file format related codes

* Reorganize original oap to oap-cache/oap folder

* Create the top level project structure

* Fix a typo

* organize for oap 0.8 for 2.4.x

* Add RemoteShuffle codebase to OAP (oap-project#1156)

* Initial commit

* Add pom

* Update ignore

* Add basic components for remote shuffle writing

* Add ExternalSorter for writing to HDFS

* Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW

* Update ShuffleResolver to write index file and commit

* Spill to remote storage

* Add RemoteExternalSorter test suite

* Test RemoteExternalSorter writer to HDFS

* Write as .index, .data

* Fix minor bugs

* Add tests for RemoteShuffleBlockResolver

* General remote shuffle reader

* Test getBlockData in Resolver

* Test HadoopFileSegmentManagedBuffer

* Refactor Resolver and test suite

* Fix: check existence first

* Test actual reading iterator

* Fix appId early getting, add basic RDD shuffle operation test

* Fix bug in the condition of empty mapoutput data file, add tests to ensure this

* Introduce classes for optimized shuffle writing

* Optimized shuffle writer path & tests

* Optimized path configurable and refactor

* Introduce BypassMergeSortShuffleWriter

* Implement bypass mergesort path & tests

* Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf

* Introduce RemoteAggregator and related classes, refactor RemoteSorter

* Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap

* Fix: No closing after coping streams

* Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter

* Fix test suite, test shuffle reader should read by block

* Avoid overriding Spark classes to make default shuffle manager still work, and other refactors

* Fix wrong importing, make more classes not override Spark code

* Make storage master and root directory configurable

* Properly get appId while running on distributed env

* Lazy evaluation for getting SparkEnv vars

* Add a remote bypass-merge threshold conf

* Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default

* Fix

* Use SortShuffle's block iterator framework including shuffle blocks pre-fetch

* Not loading any default config from files, and more objects reuse

* Make replica configurable

* Rename to ShuffleRemoteSorter

* Fix: use RemoteSorter instead of ExternalSorter

* Introduce DAGScheduler

* With executors lost, no need to rerun map tasks thanks to remote shuffle

* Require remote shuffle and external shuffle service not be enabled at the same time

* When index cache enabled, fetch index files from executors who wrote them

* Read index from Guava cache

* UT doesn't rely on external systems

* Add travis support

* add code for read/write metrics (oap-project#5)

* update read/write metrics

* write/read metrics 功能添加完毕

* Delete compile.sh

* metrics pr

* metrics pr

* add code about read/write metrics

* add codes about shuffle read/write

* add codes about shuffle read/write

* remove work file

* Fix wrong offset and length (oap-project#6)

* Fix NettyBlockRpcServer: only cast type when remote shuffle enabled

* Add micro-benchmark for shuffle writers/reader (oap-project#3)

* Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces

* Update travis

* Fix

* Add other 2 writers' benchmark

* Add reader micro-benchmark

* Multiple stages in Travis to avoid timeout

* Post benchmark results as PR comments

* Fix

* Debug

* Debug

* Fix

* Beautify

* Minor fix

* Some renames for better understanding

* Style

* spark reads hadoop conf remotely (oap-project#8)

Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request.
By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely.

* Docs (oap-project#19)

Add configuration and tuning guides.

* remove remain/release in RemoteShuffleBlockIterator (oap-project#23)

The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release
should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations.

* Read DAOS conf from local

* check style when compiling (oap-project#24)

Add scala style check

* Remove extra input stream layer, not needed because no buffer releasing (oap-project#25)

Extra layer brings overhead.

* All skip -> seek

* More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path

* Data file asynchronous pre-fetch from multiple sources (oap-project#30)

   This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight.

    In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first.

    Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads)

    More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset.

* Refactor & style

* Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33)

* Put index info in cache in map stage if index cache is enabled

* Refactor

* Fix

* Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36)

Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed.

* Add corruption detect (oap-project#34)

* Add corruption detect

* Throw Exception only in task threads

* Only retry the failed map tasks

* Fix unsafe shuffle writer (oap-project#39)

Part of oap-project#37

When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist.

Added unit tests to ensure the correctness.

* Add UTs for RemoteSorter (oap-project#40)

Ensure RemoteSorter correctness.

* Shuffle read metrics update even after cleaning up (oap-project#42)

* Shuffle read metrics update even after cleaning up

* Style

* Not overidding Spark source code for better compatibility (oap-project#44)

* Not overidding Spark source code for better compatibility

* Fix: RpcEnv is not set in Executor

* Test fix

* Implement close

* Catch and log Exception during RemoteShuffleTransferService's closing

* Remove benchmarker

* Remove the logis that will never go through under the customized TransferService, throw Exception in those branches

* Set numCores using reflection, get from Dispatcher

* Move package

* Adding back benchmark

* Style and comments

* Remove reflection, let a config determine threads number for new transfer service

* Not reading hdfs-site.xml when storage is DAOS

* Move repository

* Move repository

Co-authored-by: Shuaiqi Ge <[email protected]>

* move .travis.yml to roor Dir

* Use matrix in travis-ci

Use matrix in travis-ci, to support multiple modules

* Integrate remote-shuffle in CI & more docs (oap-project#1167)

* CI

* Remove subdir travis

* Docs

* More docs

* Separate travis tests to different stages

* Fix

* external cache-plasma cache support (oap-project#1200)

* external cache support, resolve conflict

resolve conflict

get fail will throw exception

add metrics

catch DuplicateObjectException and fix used memory

modify by comments

fix

* bug fix

* bug fix

* modify according to the comments

* modify to pass CI

Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>

* Abstract native code and jni interface to a separate module (oap-project#1207)

* abstract native code and jni interface to a separate module

* add a parent pom.xml

* revert docs

* code style

* rename oap common

* move properties to root pom

* change version to 0.8.0

* rename to com.intel.oap (oap-project#1213)

* add plasma user doc (oap-project#1215)

Co-authored-by: offthewall123 <[email protected]>

* [oap-cache/oap]Add oap-perf-suite for oap-cache/oap (#1217)

* add oap-perf-suite for OAP branch-0.8

* Clean oap file format codes

* [OAP-COMMON] rename common package (#1218)

* rename common package

* change common version to oap version

Co-authored-by: Yan Ma <[email protected]>
Co-authored-by: Hong <[email protected]>
Co-authored-by: Haifeng Chen <[email protected]>
Co-authored-by: Guo Chenzhao <[email protected]>
Co-authored-by: Shuaiqi Ge <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: zhixingheyi-tian <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: Ji Kunshang <[email protected]>
Co-authored-by: dingyu <[email protected]>
Co-authored-by: offthewall123 <[email protected]>
Co-authored-by: yeyuqiang <[email protected]>

Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172)

Fix unsatisfiedLinkError (#1222)

[OAP-CACHE] Support DAX KMEM mode (oap-project#1210)

* Support DAX KMEM mode

* move unit test to MemoryManagerConfigSuite.

* code style & enhance the numa logic

* Update docs and annotation

* delete MemoryManagerConfigSuite.scala which has rebased by other PR #1217

* typo issue

* update doc

[OAP]Rename oap

* [OAP]Add a README about the folder dev

* [OAP]Add plasma into make-distribution.sh

* [OAP]Update make-distribution.sh to add build arrow-java

Co-authored-by: Yao,Qing <[email protected]>
Co-authored-by: kellyzly <[email protected]>
HongW2019 pushed a commit to HongW2019/gazelle_plugin that referenced this pull request Sep 2, 2021
* Add RemoteShuffle codebase to OAP (oap-project#1156)

* Initial commit

* Add pom

* Update ignore

* Add basic components for remote shuffle writing

* Add ExternalSorter for writing to HDFS

* Update actual writing class RemoteBlockObjectWriter, update related interfaces to RemoteBOW

* Update ShuffleResolver to write index file and commit

* Spill to remote storage

* Add RemoteExternalSorter test suite

* Test RemoteExternalSorter writer to HDFS

* Write as .index, .data

* Fix minor bugs

* Add tests for RemoteShuffleBlockResolver

* General remote shuffle reader

* Test getBlockData in Resolver

* Test HadoopFileSegmentManagedBuffer

* Refactor Resolver and test suite

* Fix: check existence first

* Test actual reading iterator

* Fix appId early getting, add basic RDD shuffle operation test

* Fix bug in the condition of empty mapoutput data file, add tests to ensure this

* Introduce classes for optimized shuffle writing

* Optimized shuffle writer path & tests

* Optimized path configurable and refactor

* Introduce BypassMergeSortShuffleWriter

* Implement bypass mergesort path & tests

* Refactor: move HDFS connection from Utils to Resolver, add RemoteShuffleConf

* Introduce RemoteAggregator and related classes, refactor RemoteSorter

* Aggregator spill to remote storage, add tests for RemoteAppendOnlyMap

* Fix: No closing after coping streams

* Hardcode using Hadoop 2.7, truncate half write content when exception occurs, add tests for BlockObjectWriter

* Fix test suite, test shuffle reader should read by block

* Avoid overriding Spark classes to make default shuffle manager still work, and other refactors

* Fix wrong importing, make more classes not override Spark code

* Make storage master and root directory configurable

* Properly get appId while running on distributed env

* Lazy evaluation for getting SparkEnv vars

* Add a remote bypass-merge threshold conf

* Assemble Hadoop Configuration from SparkConf ++ else, instead of loading local default

* Fix

* Use SortShuffle's block iterator framework including shuffle blocks pre-fetch

* Not loading any default config from files, and more objects reuse

* Make replica configurable

* Rename to ShuffleRemoteSorter

* Fix: use RemoteSorter instead of ExternalSorter

* Introduce DAGScheduler

* With executors lost, no need to rerun map tasks thanks to remote shuffle

* Require remote shuffle and external shuffle service not be enabled at the same time

* When index cache enabled, fetch index files from executors who wrote them

* Read index from Guava cache

* UT doesn't rely on external systems

* Add travis support

* add code for read/write metrics (oap-project#5)

* update read/write metrics

* write/read metrics 功能添加完毕

* Delete compile.sh

* metrics pr

* metrics pr

* add code about read/write metrics

* add codes about shuffle read/write

* add codes about shuffle read/write

* remove work file

* Fix wrong offset and length (oap-project#6)

* Fix NettyBlockRpcServer: only cast type when remote shuffle enabled

* Add micro-benchmark for shuffle writers/reader (oap-project#3)

* Add SortShuffleWriterBenchmark to compare SortShuffle and RemoteShuffle interfaces

* Update travis

* Fix

* Add other 2 writers' benchmark

* Add reader micro-benchmark

* Multiple stages in Travis to avoid timeout

* Post benchmark results as PR comments

* Fix

* Debug

* Debug

* Fix

* Beautify

* Minor fix

* Some renames for better understanding

* Style

* spark reads hadoop conf remotely (oap-project#8)

### What changes were proposed in this pull request?
Originally RemoteShuffle load an empty Hadoop configuration by `val hadoopConf = new Configuration(false)`. However, Hadoop configuration needs to be loaded remotely. Some work is done in this pull request.
### How was this patch tested?
By a new unit test in `org.apache.spark.shuffle.remote.RemoteShuffleManagerSuite` where a fade server is mocked to provide Hadoop configuration remotely.

* Docs (oap-project#19)

Add configuration and tuning guides.

* remove remain/release in RemoteShuffleBlockIterator (oap-project#23)

The concrete buffer implementation of ManagedBuffer might be managed outside the JVM garbage collector. If the buffer is going to be passed around to a different thread, retain/release
should be called. But in RemoteShuffle, HadoopManagedBuffer is used, and it's definitely inside a JVM's lifecycle, so we don't need these operations.

* Read DAOS conf from local

* check style when compiling (oap-project#24)

Add scala style check

* Remove extra input stream layer, not needed because no buffer releasing (oap-project#25)

Extra layer brings overhead.

* All skip -> seek

* More tests on ShuffleManager, UTs on read Iterator covering index cache enabled path

* Data file asynchronous pre-fetch from multiple sources (oap-project#30)

   This PR resolves oap-project#16 , improving shuffle read performance by asynchronously reading whole ShuffleBlocks requests to memory(and then perform later operations) & constraining the number of reading requests in flight.

    In reduce stage, we observed a long time thread blocking for remote I/O to be ready. An optimization resembles vanilla Spark's can be made: send multiple block reading requests asynchronously before we actually need the data for compute, put the shuffle blocks fetched in a queue, and use the subsequent compute takes whichever block that's ready first.

    Constrain the requests in flight by maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress (these 3 are identical to vanilla Spark) and maxConcurrentFetches(introduced, for the maximum data file reading threads)

    More tests with bigger datasets, different map side partition lengths, index cache enabled/disabled, and constraints set/unset.

* Refactor & style

* Put index information in cache in map stage to avoid loading from storage in reduce stage (oap-project#33)

* Put index info in cache in map stage if index cache is enabled

* Refactor

* Fix

* Fix: Failing to fetch remote HDFS configurations should not crash the app (oap-project#36)

Minor fix to avoid exceptions originated by 2 reasons under HDFS: 1)port unset, 2)connection failed.

* Add corruption detect (oap-project#34)

* Add corruption detect

* Throw Exception only in task threads

* Only retry the failed map tasks

* Fix unsafe shuffle writer (oap-project#39)

Part of oap-project#37

When memory is insufficient and spill happens, the outputs produced by unsafe shuffle writer are wrong. It's due to the bugs in mergeSpillsWithTransferTo, missed the length parameter during Streams copying. Actually this merge path doesn't apply in remote shuffle over Hadoop storage, because the NIO-based transferTo optimization may not exist.

Added unit tests to ensure the correctness.

* Add UTs for RemoteSorter (oap-project#40)

Ensure RemoteSorter correctness.

* Shuffle read metrics update even after cleaning up (oap-project#42)

* Shuffle read metrics update even after cleaning up

* Style

* Not overidding Spark source code for better compatibility (oap-project#44)

* Not overidding Spark source code for better compatibility

* Fix: RpcEnv is not set in Executor

* Test fix

* Implement close

* Catch and log Exception during RemoteShuffleTransferService's closing

* Remove benchmarker

* Remove the logis that will never go through under the customized TransferService, throw Exception in those branches

* Set numCores using reflection, get from Dispatcher

* Move package

* Adding back benchmark

* Style and comments

* Remove reflection, let a config determine threads number for new transfer service

* Not reading hdfs-site.xml when storage is DAOS

* Move repository

* Move repository

Co-authored-by: Shuaiqi Ge <[email protected]>

* Integrate remote-shuffle in CI & more docs (oap-project#1167)

* CI

* Remove subdir travis

* Docs

* More docs

* Separate travis tests to different stages

* Fix

* Introduce new performance evaluation tool and deprecate the old micro-benchmark (oap-project#1172)

* [remote-shuffle]Refactor (oap-project#1206)

* Refactor

* Docs

* [remote-shuffle]Add docs for performance evaluation tool (#1233)

* Allow producing a test jar with dependencies, refactor

* Support -h help

* Add docs

* Disable hash-based shuffle writer by default (#1239)

* Reuse file handle in reduce stage (#1234)

* Remove perf evaluation tool

* Update: scheduler in Spark 3.0

* Basic update for Spark3.0, updated ShuffleManager and related codes

* Upper level batch fetch support, full custom metrics support(by Reynold)

* Update readme

* Modify Travis, empty install

* Modify docs

Co-authored-by: Shuaiqi Ge <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

columnar whole stage codegen failed due to empty results
1 participant