Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-561] refine docs on sample configurations and code generation behavior #572

Merged
merged 7 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ You can add these configuration into spark-defaults.conf to enable or disable th
| spark.executor.memory| To set up how much memory to be used for Spark Executor. | |
| spark.memory.offHeap.size| To set up how much memory to be used for Java OffHeap.<br /> Please notice Gazelle Plugin will leverage this setting to allocate memory space for native usage even offHeap is disabled. <br /> The value is based on your system and it is recommended to set it larger if you are facing Out of Memory issue in Gazelle Plugin | 30G |
| spark.sql.sources.useV1SourceList | Choose to use V1 source | avro |
| spark.sql.join.preferSortMergeJoin | To turn off preferSortMergeJoin in Spark | false |
| spark.sql.join.preferSortMergeJoin | To turn on/off preferSortMergeJoin in Spark. In gazelle we recomend to turn off this to get better performance | true |
| spark.plugins | To turn on Gazelle Plugin | com.intel.oap.GazellePlugin |
| spark.shuffle.manager | To turn on Gazelle Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager |
| spark.sql.shuffle.partitions | shuffle partition size, it's recomended to use the same number of your total cores | 200 |
| spark.oap.sql.columnar.batchscan | Enable or Disable Columnar Batchscan, default is true | true |
| spark.oap.sql.columnar.hashagg | Enable or Disable Columnar Hash Aggregate, default is true | true |
| spark.oap.sql.columnar.projfilter | Enable or Disable Columnar Project and Filter, default is true | true |
Expand All @@ -28,22 +29,70 @@ You can add these configuration into spark-defaults.conf to enable or disable th
| spark.oap.sql.columnar.nanCheck | Enable or Disable Nan Check, default is true | true |
| spark.oap.sql.columnar.hashCompare | Enable or Disable Hash Compare in HashJoins or HashAgg, default is true | true |
| spark.oap.sql.columnar.broadcastJoin | Enable or Disable Columnar BradcastHashJoin, default is true | true |
| spark.oap.sql.columnar.sortmergejoin.lazyread | Enable or Disable lazy reading on Sort result. On disable, whole partition will be cached before doing SortMergeJoin | false |
| spark.oap.sql.columnar.wholestagecodegen | Enable or Disable Columnar WholeStageCodeGen, default is true | true |
| spark.oap.sql.columnar.preferColumnar | Enable or Disable Columnar Operators, default is false.<br /> This parameter could impact the performance in different case. In some cases, to set false can get some performance boost. | false |
| spark.oap.sql.columnar.joinOptimizationLevel | Fallback to row operators if there are several continous joins | 6 |
| spark.sql.execution.arrow.maxRecordsPerBatch | Set up the Max Records per Batch | 10000 |
| spark.sql.execution.sort.spillThreshold | Set up the Max sort in memory threshold | 256M |
| spark.sql.execution.sort.spillThreshold | Set up the Max sort in memory threshold in bytes, default is disabled | -1 |
| spark.oap.sql.columnar.wholestagecodegen.breakdownTime | Enable or Disable metrics in Columnar WholeStageCodeGen | false |
| spark.oap.sql.columnar.tmp_dir | Set up a folder to store the codegen files | /tmp |
| spark.oap.sql.columnar.tmp_dir | Set up a folder to store the codegen files, default is disabled | "" |
| spark.oap.sql.columnar.shuffle.customizedCompression.codec | Set up the codec to be used for Columnar Shuffle, default is lz4| lz4 |
zhouyuan marked this conversation as resolved.
Show resolved Hide resolved
| spark.oap.sql.columnar.numaBinding | Set up NUMABinding, default is false| true |
| spark.oap.sql.columnar.coreRange | Set up the core range for NUMABinding, only works when numaBinding set to true. <br /> The setting is based on the number of cores in your system. Use 72 cores as an example. | 0-17,36-53 &#124;18-35,54-71 |
| spark.oap.sql.columnar.coreRange | Set up the core range for NUMABinding, only works when numaBinding set to true. <br /> The setting is based on the number of cores in your system(lscpu | grep node[0-4]). Use 72 cores as an example. | 0-17,36-53 &#124;18-35,54-71 |

## Example thrift-server configuration
Here's one example of the thrift-server configuration
```
THRIFTSERVER_CONFIG="--name ${runname}
--num-executors 72
--driver-memory 20g
--executor-memory 6g
--executor-cores 6
--master yarn
--deploy-mode client
--conf spark.executor.memoryOverhead=384
--conf spark.executorEnv.CC=/home/sparkuser/miniconda3/envs/arrow-new/bin/gcc
--conf spark.plugins=com.intel.oap.GazellePlugin
--conf spark.executorEnv.LD_LIBRARY_PATH=/home/sparkuser/miniconda3/envs/arrow-new/lib/:/home/sparkuser/miniconda3/envs/arrow-new/lib64/
--conf spark.executorEnv.LIBARROW_DIR=/home/sparkuser/miniconda3/envs/arrow-new
--conf spark.driver.extraClassPath=${nativesql_jars}
--conf spark.executor.extraClassPath=${nativesql_jars}
--conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager
--conf spark.sql.join.preferSortMergeJoin=false
--conf spark.sql.inMemoryColumnarStorage.batchSize=${batchsize}
--conf spark.sql.execution.arrow.maxRecordsPerBatch=${batchsize}
--conf spark.sql.parquet.columnarReaderBatchSize=${batchsize}
--conf spark.sql.autoBroadcastJoinThreshold=10M
--conf spark.sql.broadcastTimeout=600
--conf spark.sql.crossJoin.enabled=true
--conf spark.driver.maxResultSize=20g
--hiveconf hive.server2.thrift.port=10001
--hiveconf hive.server2.thrift.bind.host=sr270
--conf spark.sql.codegen.wholeStage=true
--conf spark.sql.shuffle.partitions=432
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=15g
--conf spark.kryoserializer.buffer.max=128m
--conf spark.kryoserializer.buffer=32m
--conf spark.oap.sql.columnar.preferColumnar=false
--conf spark.oap.sql.columnar.sortmergejoin.lazyread=true
--conf spark.sql.execution.sort.spillThreshold=2147483648
--conf spark.executorEnv.LD_PRELOAD=/home/sparkuser/miniconda3/envs/arrow-new/lib/libjemalloc.so
--conf spark.executorEnv.MALLOC_CONF=background_thread:true,dirty_decay_ms:0,muzzy_decay_ms:0,narenas:2
--conf spark.executorEnv.MALLOC_ARENA_MAX=2
--conf spark.oap.sql.columnar.numaBinding=true
--conf spark.oap.sql.columnar.coreRange=0-35,72-107|36-71,108-143
--conf spark.oap.sql.columnar.joinOptimizationLevel=18
--conf spark.oap.sql.columnar.shuffle.customizedCompression.codec=lz4
--conf spark.yarn.appMasterEnv.LD_PRELOAD=/home/sparkuser/miniconda3/envs/arrow-new/lib/libjemalloc.so"
```

Below is an example for spark-default.conf, if you are using conda to install OAP project.

```
##### Columnar Process Configuration

## Example spark-defaults.conf
```
spark.sql.sources.useV1SourceList avro
spark.sql.join.preferSortMergeJoin false
spark.plugins com.intel.oap.GazellePlugin
Expand All @@ -64,4 +113,12 @@ Before you start spark, you must use below command to add some environment varia
export CC=$HOME/miniconda2/envs/oapenv/bin/gcc
export LIBARROW_DIR=$HOME/miniconda2/envs/oapenv/
```

## Notes on driver
In gazelle spark driver is used to C++ code generation for different operators. This means driver takes more tasks than vanilla Spark, so it's better to consider allocate more resource to driver. By default, driver will compile C++ codes with best optimizations targeting for local CPU architecture:
```
-O3 -march=native
```
This could be override by a local environment variable before starting driver:
```
export CODEGEN_OPTION=" -O1 -mavx2 -fno-semantic-interposition "
```
4 changes: 3 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ ${SPARK_HOME}/bin/spark-shell \
--verbose \
--master yarn \
--driver-memory 10G \
--conf --conf spark.plugins=com.intel.oap.GazellePlugin \
--conf spark.driver.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-<version>-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-<version>-jar-with-dependencies.jar \
--conf spark.executor.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-<version>-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-<version>-jar-with-dependencies.jar \
--conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \
--conf spark.driver.cores=1 \
--conf spark.executor.instances=12 \
--conf spark.executor.cores=6 \
Expand Down Expand Up @@ -155,7 +157,7 @@ We pick up 10 queries which can be fully supported in OAP v1.0-Gazelle Plugin an

![Performance](./image/decision_support_bench2_result_by_query.png)

Please notes the performance data is not an official from TPC-H and TPC-DS. The actual performance result may vary by individual workloads. Please try your workloads with Gazelle Plugin first and check the DAG or log file to see if all the operators can be supported in OAP-Gazelle Plugin.
Please notes the performance data is not an official from TPC-H and TPC-DS. The actual performance result may vary by individual workloads. Please try your workloads with Gazelle Plugin first and check the DAG or log file to see if all the operators can be supported in OAP-Gazelle Plugin. Please check the [detailed page](./performance.md) on performance tuning for TPC-H and TPC-DS workloads.

## Memory allocation
The memory usage in Gazelle Plugin is high. The allocations goes to two parts: 1) Java based allocation which is widely used in Arrow Java API. 2) Native side memory allocation used in each native kernel. We investigated the memory allocation behavior and made more turnings [here](./memory.md), the memroy footprint is stable during a TPC-DS power run.
Expand Down
21 changes: 21 additions & 0 deletions docs/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Performance tuning for Gazelle Plugin

It is complicated to tune for Spark workloads as each varies a lot. Here are several general tuning options on the most popular TPCH/TPC-DS benchmarking.

## Data Generating
On non-partiton tables, it's better to set the number of files to times of total HT cores in your cluster, e.g., there are 384 cores in your cluster, we'd better to set the file numbers to 2*384 or 3*384 to ensure. In this way Spark would only issue one task for each file(together with spark.sql.files.maxPartitionBytes option).

Note the spark.sql.files.maxRecordsPerFile will also split the files in to smaller ones. We may just disable this feature to -1.

## Shuffle Partitions
As Gazelle currently only supports hash based shuffle, it's recommended to use 1 or 2 times shuffle partitions of the total HT cores in the cluster. e.g., there are 384 cores in the cluster, it's better to use spark.sql.shuffle.partitions = 384 or 768. It this way it's most efficient for Gazelle.

## On-heap/Off-heap Memory Size
Unlike Spark, most of the memory usage in Gazelle would be off-heap based. So a big off-heap is recomended. There are still some small objects in On-heap thus a proper sized on-heap is also required. We are recommending below configurations for memory related settings.
```
--executor-cores 6
--executor-memory 6g // on-heap memory: 1G per core
--conf spark.executor.memoryOverhead=384 // not used, 384M should be enough
--conf spark.memory.offHeap.enabled=true // enable off-heap thus Spark can control the memory
--conf spark.memory.offHeap.size=15g // a big off-heap is required
```