Skip to content

Commit

Permalink
[ML-93][GPU] Add GPU support for PCA (#104)
Browse files Browse the repository at this point in the history
* Add pca

* add pca gpu script

* nit
  • Loading branch information
xwu99 authored Aug 2, 2021
1 parent eddc4c4 commit 7bf73e4
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 42 deletions.
2 changes: 1 addition & 1 deletion examples/kmeans/run-gpu-standalone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ DATA_FILE=$HDFS_ROOT/data/sample_kmeans_data.txt
APP_JAR=target/oap-mllib-examples-$OAP_MLLIB_VERSION.jar
APP_CLASS=org.apache.spark.examples.ml.KMeansExample

USE_GPU=true
RESOURCE_FILE=$PWD/IntelGpuResourceFile.json
WORKER_GPU_AMOUNT=4
EXECUTOR_GPU_AMOUNT=1
TASK_GPU_AMOUNT=1
USE_GPU=true

# Should run in standalone mode
time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \
Expand Down
20 changes: 20 additions & 0 deletions examples/pca/GetIntelGpuResources.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

# This script is a basic example script to get resource information about NVIDIA GPUs.
# It assumes the drivers are properly installed and the nvidia-smi command is available.
# It is not guaranteed to work on all setups so please test and customize as needed
# for your environment. It can be passed into SPARK via the config
# spark.{driver/executor}.resource.gpu.discoveryScript to allow the driver or executor to discover
# the GPUs it was allocated. It assumes you are running within an isolated container where the
# GPUs are allocated exclusively to that driver or executor.
# It outputs a JSON formatted string that is expected by the
# spark.{driver/executor}.resource.gpu.discoveryScript config.
#
# Example output: {"name": "gpu", "addresses":["0","1","2","3","4","5","6","7"]}

#ADDRS=`nvidia-smi --query-gpu=index --format=csv,noheader | sed -e ':a' -e 'N' -e'$!ba' -e 's/\n/","/g'`
#echo {\"name\": \"gpu\", \"addresses\":[\"$ADDRS\"]}
#ADDRS="0","1","2","3","4","5","6","7"
#echo {\"name\": \"gpu\", \"addresses\":[\"$ADDRS\"]}

echo {\"name\": \"gpu\", \"addresses\":[\"0\",\"1\",\"2\",\"3\"]}
1 change: 1 addition & 0 deletions examples/pca/IntelGpuResourceFile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2","3"]}]
38 changes: 38 additions & 0 deletions examples/pca/run-gpu-standalone.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env bash

source ../../conf/env.sh

APP_JAR=target/oap-mllib-examples-$OAP_MLLIB_VERSION.jar
APP_CLASS=org.apache.spark.examples.ml.PCAExample

USE_GPU=true
RESOURCE_FILE=$PWD/IntelGpuResourceFile.json
WORKER_GPU_AMOUNT=4
EXECUTOR_GPU_AMOUNT=1
TASK_GPU_AMOUNT=1

# Should run in standalone mode
time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \
--num-executors $SPARK_NUM_EXECUTORS \
--executor-cores $SPARK_EXECUTOR_CORES \
--total-executor-cores $SPARK_TOTAL_CORES \
--driver-memory $SPARK_DRIVER_MEMORY \
--executor-memory $SPARK_EXECUTOR_MEMORY \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.default.parallelism=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.sql.shuffle.partitions=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.driver.extraClassPath=$SPARK_DRIVER_CLASSPATH" \
--conf "spark.executor.extraClassPath=$SPARK_EXECUTOR_CLASSPATH" \
--conf "spark.oap.mllib.useGPU=$USE_GPU" \
--conf "spark.worker.resourcesFile=$RESOURCE_FILE" \
--conf "spark.worker.resource.gpu.amount=$WORKER_GPU_AMOUNT" \
--conf "spark.executor.resource.gpu.amount=$EXECUTOR_GPU_AMOUNT" \
--conf "spark.task.resource.gpu.amount=$TASK_GPU_AMOUNT" \
--conf "spark.shuffle.reduceLocality.enabled=false" \
--conf "spark.network.timeout=1200s" \
--conf "spark.task.maxFailures=1" \
--jars $OAP_MLLIB_JAR \
--class $APP_CLASS \
$APP_JAR \
2>&1 | tee PCA-$(date +%m%d_%H_%M_%S).log

54 changes: 36 additions & 18 deletions mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,45 @@ int getLocalRank(ccl::communicator &comm, int size, int rank) {
// return 0;
}

void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu) {
int rank = comm.rank();
int size = comm.size();
//void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu) {
// int rank = comm.rank();
// int size = comm.size();
//
// /* Create GPU device from local rank and set execution context */
// auto local_rank = getLocalRank(comm, size, rank);
// auto gpus = get_gpus();
//
// std::cout << rank << " " << size << " " << local_rank << " " << n_gpu
// << std::endl;
//
// for (int i = 0; i < n_gpu; i++) {
// std::cout << gpu_idx[i] << std::endl;
// }
//
// // auto rank_gpu = gpus[gpu_idx[local_rank % n_gpu]];
// static auto rank_gpu = gpus[0];
//
// static cl::sycl::queue queue(rank_gpu);
// std::cout << "SyclExecutionContext" << std::endl;
// static daal::services::SyclExecutionContext ctx(queue);
// std::cout << "setDefaultExecutionContext" << std::endl;
// daal::services::Environment::getInstance()->setDefaultExecutionContext(ctx);
//}

/* Create GPU device from local rank and set execution context */
auto local_rank = getLocalRank(comm, size, rank);
sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId,
jint *gpu_indices, int n_gpu
) {
auto local_rank = getLocalRank(comm, size, rankId);
auto gpus = get_gpus();

std::cout << rank << " " << size << " " << local_rank << " " << n_gpu
std::cout << "rank: " << rankId << " size: " << size
<< " local_rank: " << local_rank << " n_gpu: " << n_gpu
<< std::endl;

for (int i = 0; i < n_gpu; i++) {
std::cout << gpu_idx[i] << std::endl;
}

// auto rank_gpu = gpus[gpu_idx[local_rank % n_gpu]];
static auto rank_gpu = gpus[0];
auto gpu_selected = gpu_indices[local_rank % n_gpu];
std::cout << "GPU selected for current rank: " << gpu_selected
<< std::endl;
auto rank_gpu = gpus[gpu_selected % gpus.size()];

static cl::sycl::queue queue(rank_gpu);
std::cout << "SyclExecutionContext" << std::endl;
static daal::services::SyclExecutionContext ctx(queue);
std::cout << "setDefaultExecutionContext" << std::endl;
daal::services::Environment::getInstance()->setDefaultExecutionContext(ctx);
}
return rank_gpu;
}
5 changes: 4 additions & 1 deletion mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@

int getLocalRank(ccl::communicator &comm, int size, int rank);
std::vector<sycl::device> get_gpus();
// void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu);
// void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu);

sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId,
jint *gpu_indices, int n_gpu);
22 changes: 6 additions & 16 deletions mllib-dal/src/main/native/KMeansDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,34 +247,24 @@ Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMeansDALComputeWithInitCente
jobject resultObj) {

ccl::communicator &comm = getComm();
int rankId = comm.rank();
int size = comm.size();
int rankId = comm.rank();

NumericTablePtr pData = *((NumericTablePtr *)pNumTabData);
NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters);

jlong ret = 0L;
if (use_gpu) {
int n_gpu = env->GetArrayLength(gpu_idx_array);
cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< endl;

int n_gpu = env->GetArrayLength(gpu_idx_array);
jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0);

auto local_rank = getLocalRank(comm, size, rankId);
auto gpus = get_gpus();

std::cout << "rank: " << rankId << " size: " << size
<< " local_rank: " << local_rank << " n_gpu: " << n_gpu
<< std::endl;

auto gpu_selected = gpu_indices[local_rank % n_gpu];
std::cout << "GPU selected for current rank: " << gpu_selected
std::cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< std::endl;
auto rank_gpu = gpus[gpu_selected % gpus.size()];

auto assigned_gpu = getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu);

// Set SYCL context
cl::sycl::queue queue(rank_gpu);
cl::sycl::queue queue(assigned_gpu);
daal::services::SyclExecutionContext ctx(queue);
daal::services::Environment::getInstance()->setDefaultExecutionContext(
ctx);
Expand Down
5 changes: 3 additions & 2 deletions mllib-dal/src/main/native/PCADALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ typedef double algorithmFPType; /* Algorithm floating-point type */
/*
* Class: org_apache_spark_ml_feature_PCADALImpl
* Method: cPCATrainDAL
* Signature: (JIIILorg/apache/spark/ml/feature/PCAResult;)J
* Signature: (JIIIZ[ILorg/apache/spark/ml/feature/PCAResult;)J
*/

JNIEXPORT jlong JNICALL
Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num,
jint executor_cores, jobject resultObj) {
jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, jobject resultObj) {

using daal::byte;

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package org.apache.spark.ml.feature

import java.util.Arrays

import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils}
Expand Down Expand Up @@ -48,7 +47,17 @@ class PCADALImpl(val k: Int,

val kvsIPPort = kvsIP + "_" + kvsPort

val sparkContext = data.sparkContext
val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false)

val results = coalescedTables.mapPartitionsWithIndex { (rank, table) =>
val gpuIndices = if (useGPU) {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}

val tableArr = table.next()
OneCCL.init(executorNum, rank, kvsIPPort)

Expand All @@ -58,6 +67,8 @@ class PCADALImpl(val k: Int,
k,
executorNum,
executorCores,
useGPU,
gpuIndices,
result
)

Expand Down Expand Up @@ -139,6 +150,8 @@ class PCADALImpl(val k: Int,
k: Int,
executor_num: Int,
executor_cores: Int,
useGPU: Boolean,
gpuIndices: Array[Int],
result: PCAResult): Long

}

0 comments on commit 7bf73e4

Please sign in to comment.