Skip to content

Commit

Permalink
update emr example and add initmode setter to kmeans (#767)
Browse files Browse the repository at this point in the history
* update emr example and add init mode setter to kmeans

Signed-off-by: Erik Ordentlich <[email protected]>

* clean up

Signed-off-by: Erik Ordentlich <[email protected]>

* no auto fail to avoid a hang

Signed-off-by: Erik Ordentlich <[email protected]>

---------

Signed-off-by: Erik Ordentlich <[email protected]>
  • Loading branch information
eordentlich authored Oct 25, 2024
1 parent 668b20e commit ced8bed
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 39 deletions.
3 changes: 2 additions & 1 deletion notebooks/aws-emr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ If you already have a AWS EMR account, you can run the example notebooks on an E
aws emr create-cluster \
--name ${CLUSTER_NAME} \
--release-label emr-6.10.0 \
--release-label emr-7.3.0 \
--ebs-root-volume-size=32 \
--applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
--service-role EMR_DefaultRole \
--log-uri s3://${S3_BUCKET}/logs \
Expand Down
31 changes: 18 additions & 13 deletions notebooks/aws-emr/init-bootstrap-action.sh
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
#!/bin/bash

set -ex

sudo chmod a+rwx -R /sys/fs/cgroup/cpu,cpuacct
sudo chmod a+rwx -R /sys/fs/cgroup/devices

sudo yum install -y gcc openssl-devel bzip2-devel libffi-devel tar gzip wget make mysql-devel
sudo bash -c "wget https://www.python.org/ftp/python/3.9.9/Python-3.9.9.tgz && tar xzf Python-3.9.9.tgz && cd Python-3.9.9 && ./configure --enable-optimizations && make altinstall"
sudo mkdir -p /spark-rapids-cgroup/devices
sudo mount -t cgroup -o devices cgroupv1-devices /spark-rapids-cgroup/devices
sudo chmod a+rwx -R /spark-rapids-cgroup

RAPIDS_VERSION=24.8.0
sudo yum update -y
sudo yum install -y gcc bzip2-devel libffi-devel tar gzip wget make
sudo yum install -y mysql-devel --skip-broken
sudo bash -c "wget https://www.python.org/ftp/python/3.10.9/Python-3.10.9.tgz && \
tar xzf Python-3.10.9.tgz && cd Python-3.10.9 && \
./configure --enable-optimizations && make altinstall"

RAPIDS_VERSION=24.10.0

sudo /usr/local/bin/pip3.10 install --upgrade pip

# install scikit-learn
sudo /usr/local/bin/pip3.9 install scikit-learn
sudo /usr/local/bin/pip3.10 install scikit-learn

# install cudf and cuml
sudo /usr/local/bin/pip3.9 install --no-cache-dir cudf-cu11==${RAPIDS_VERSION} \
cuml-cu11==${RAPIDS_VERSION} \
cuvs-cu11==${RAPIDS_VERSION} \
pylibraft-cu11==${RAPIDS_VERSION} \
rmm-cu11==${RAPIDS_VERSION} \
--extra-index-url=https://pypi.nvidia.com
sudo /usr/local/bin/pip3.10 install --no-cache-dir cudf-cu12 --extra-index-url=https://pypi.nvidia.com --verbose
sudo /usr/local/bin/pip3.10 install --no-cache-dir cuml-cu12 cuvs-cu12 --extra-index-url=https://pypi.nvidia.com --verbose

sudo /usr/local/bin/pip3.10 list

10 changes: 5 additions & 5 deletions notebooks/aws-emr/init-configurations.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices":"auto",
"yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables":"/usr/bin",
"yarn.nodemanager.linux-container-executor.cgroups.mount":"true",
"yarn.nodemanager.linux-container-executor.cgroups.mount-path":"/sys/fs/cgroup",
"yarn.nodemanager.linux-container-executor.cgroups.mount-path":"/spark-rapids-cgroup",
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy":"yarn",
"yarn.nodemanager.container-executor.class":"org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"
}
Expand All @@ -33,7 +33,7 @@
{
"Classification":"cgroups",
"Properties":{
"root":"/sys/fs/cgroup",
"root":"/spark-rapids-cgroup",
"yarn-hierarchy":"yarn"
}
}
Expand Down Expand Up @@ -68,14 +68,14 @@
"spark.sql.execution.arrow.pyspark.enabled":"true",
"spark.sql.execution.arrow.maxRecordsPerBatch":"100000",
"spark.sql.cache.serializer":"com.nvidia.spark.ParquetCachedBatchSerializer",
"spark.pyspark.python":"python3.9",
"spark.pyspark.driver.python":"python3.9",
"spark.pyspark.python":"/usr/local/bin/python3.10",
"spark.pyspark.driver.python":"/usr/local/bin/python3.10",
"spark.dynamicAllocation.enabled":"false",
"spark.driver.memory":"20g",
"spark.rpc.message.maxSize":"512",
"spark.executorEnv.CUPY_CACHE_DIR":"/tmp/.cupy",
"spark.executorEnv.NCCL_DEBUG":"INFO",
"spark.executorEnv.NCCL_SOCKET_IFNAME":"eth"
"spark.executorEnv.NCCL_SOCKET_IFNAME":"ens"
}
},
{
Expand Down
8 changes: 7 additions & 1 deletion python/benchmark/aws-emr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ This directory contains shell scripts for running larger-scale benchmarks on an
```
**Note**: this step should be repeated for each new version of the spark-rapids-ml package that you want to test.

## Create an ssh key pair
- The benchmark script needs ssh access to the EMR cluster and this requires creating an [EC2 key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/create-key-pairs.html). Choose the **pem** format. After saving the private key locally with `.pem` as the file extension, set the following environment variable to point to its location.
```
export KEYPAIR=/path/to/private/key.pem
```

## Prepare Subnet
- Print out available subnets in CLI then pick a SubnetId of your region (e.g. subnet-0744566f of AvailabilityZone us-east-2a in region Ohio). A subnet is required to start an EMR cluster.
- Print out available subnets in CLI then pick a SubnetId of your region (e.g. subnet-0744566f of AvailabilityZone us-east-2a in region Ohio). A subnet is required to start an EMR cluster. Make sure that your selected subnet allows SSH access (port 22) from your local host where you will be invoking the benchmarking script. The public subnet in the default VPC in your account might be a suitable choice. See AWS EMR documentation for more info on [VPCs for EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-vpc-host-job-flows.html) and related info on SSH access in [managed security groups used by EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-man-sec-groups.html).

```
aws ec2 describe-subnets
Expand Down
4 changes: 2 additions & 2 deletions python/benchmark/aws-emr/cpu-init-configurations.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
{
"Classification":"spark-defaults",
"Properties":{
"spark.pyspark.python":"python3.9",
"spark.pyspark.driver.python":"python3.9"
"spark.pyspark.python":"/usr/local/bin/python3.10",
"spark.pyspark.driver.python":"/usr/local/bin/python3.10"
}
}
]
63 changes: 55 additions & 8 deletions python/benchmark/aws-emr/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,67 @@ if [[ $? != 0 ]]; then
exit 1
fi

ssh_command () {
aws emr wait cluster-running --cluster-id $CLUSTER_ID
if [[ $? != 0 ]]; then
echo "cluster terminated, exiting"
exit 1
fi
ssh -i $KEYPAIR -o StrictHostKeyChecking=no ec2-user@$masternode $1
}

get_masternode () {
aws emr list-instances --cluster-id $CLUSTER_ID --instance-group-type MASTER | grep PublicDnsName | grep -oP 'ec2[^"]*'
}

get_appid () {
ssh_command "hdfs dfs -text $stderr_path" | grep -oP "application_[0-9]*_[0-9]*" | head -n 1
}

get_appstatus () {
ssh_command "yarn application -status $app_id" | grep -P "\tState :" | grep -oP FINISHED
}

poll_stdout () {
stdout_path=s3://${BENCHMARK_HOME}/logs/$1/steps/$2/stdout.gz
res="PENDING"
while [[ ${res} != *"COMPLETED"* ]]
stderr_path=s3://${BENCHMARK_HOME}/logs/$1/steps/$2/stderr.gz
masternode=$( get_masternode )

while [[ -z $masternode ]]; do
sleep 30
masternode=$( get_masternode )
done

echo masternode: $masternode
app_id=""

app_id=$( get_appid )
echo app_id: $app_id
while [[ -z $app_id ]]
do
sleep 30
res=$(aws emr describe-step --cluster-id $1 --step-id $2 | grep "State")
echo ${res}
if [[ ${res} == *"FAILED"* ]]; then
echo "Failed to finish step $2."
exit 1
fi
app_id=$( get_appid )
echo app_id: $app_id
done

res=$( get_appstatus )
echo res: $res
while [[ ${res} != FINISHED ]]
do
sleep 30
res=$( get_appstatus )
echo res: ${res}
done

aws emr cancel-steps --cluster-id $1 --step-ids $2 --step-cancellation-option SEND_INTERRUPT

res=$( ssh_command "yarn application -status $app_id" | grep -P "\tFinal-State :" | sed -e 's/.*: *//g' )

if [[ $res != SUCCEEDED ]]; then
echo "benchmark step failed"
exit 1
fi

# check if EMR stdout.gz is complete
res=""
while [[ ${res} != *"datetime"* ]]
Expand Down
32 changes: 23 additions & 9 deletions python/benchmark/aws-emr/start_cluster.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash
#!/bin/bash -ex
set -o pipefail

cluster_type=${1:-gpu}

# configure arguments
Expand All @@ -12,15 +14,22 @@ if [[ -z ${BENCHMARK_HOME} ]]; then
exit 1
fi

if [[ -z ${KEYPAIR} ]]; then
echo "Please export KEYPAIR per README.md"
exit 1
fi

cluster_name=spark-rapids-ml-${cluster_type}
cur_dir=$(pwd)

if [[ ${cluster_type} == "gpu" ]]; then
core_type=g4dn.2xlarge
core_type=g5.2xlarge
config_json="file://${cur_dir}/../../../notebooks/aws-emr/init-configurations.json"
bootstrap_actions="--bootstrap-actions Name='Spark Rapids ML Bootstrap action',Path=s3://${BENCHMARK_HOME}/init-bootstrap-action.sh"
elif [[ ${cluster_type} == "cpu" ]]; then
core_type=m4.2xlarge
core_type=m6gd.2xlarge
config_json="file://${cur_dir}/cpu-init-configurations.json"
bootstrap_actions=""
else
echo "unknown cluster type ${cluster_type}"
echo "usage: ./${script_name} cpu|gpu"
Expand All @@ -29,17 +38,22 @@ fi

start_cmd="aws emr create-cluster \
--name ${cluster_name} \
--release-label emr-6.10.0 \
--release-label emr-7.3.0 \
--applications Name=Hadoop Name=Spark \
--service-role EMR_DefaultRole \
--log-uri s3://${BENCHMARK_HOME}/logs \
--ec2-attributes SubnetId=${SUBNET_ID},InstanceProfile=EMR_EC2_DefaultRole \
--ec2-attributes KeyName=$(basename ${KEYPAIR} | sed -e 's/\.pem//g' ),SubnetId=${SUBNET_ID},InstanceProfile=EMR_EC2_DefaultRole \
--ebs-root-volume-size=32 \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.2xlarge \
InstanceGroupType=CORE,InstanceCount=3,InstanceType=${core_type} \
--configurations ${config_json} \
--bootstrap-actions Name='Spark Rapids ML Bootstrap action',Path=s3://${BENCHMARK_HOME}/init-bootstrap-action.sh
--configurations ${config_json} $bootstrap_actions
"

CLUSTER_ID=$(eval ${start_cmd} | tee /dev/tty | grep "ClusterId" | grep -o 'j-[0-9|A-Z]*')
CLUSTER_ID=$( eval ${start_cmd} | tee /dev/tty | grep "ClusterId" | grep -o 'j-[0-9|A-Z]*')
aws emr put-auto-termination-policy --cluster-id ${CLUSTER_ID} --auto-termination-policy IdleTimeout=1800
echo "${CLUSTER_ID}"
echo "waiting for cluster ${CLUSTER_ID} to start ... " 1>&2

aws emr wait cluster-running --cluster-id $CLUSTER_ID

echo "cluster started." 1>&2
echo $CLUSTER_ID
6 changes: 6 additions & 0 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ def __init__(
super().__init__()
self._set_params(**self._input_kwargs)

def setInitMode(self, value: str) -> "KMeans":
"""
Sets the value of :py:attr:`initMode`.
"""
return self._set_params(initMode=value)

def setK(self, value: int) -> "KMeans":
"""
Sets the value of :py:attr:`k`.
Expand Down
2 changes: 2 additions & 0 deletions python/tests/test_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def test_kmeans_spark_compat(

kmeans.setSeed(1)
kmeans.setMaxIter(10)
kmeans.setInitMode("k-means||")
if isinstance(kmeans, SparkKMeans):
kmeans.setWeightCol("weighCol")
else:
Expand All @@ -412,6 +413,7 @@ def test_kmeans_spark_compat(
assert kmeans.getMaxIter() == 10
assert kmeans.getK() == 2
assert kmeans.getSeed() == 1
assert kmeans.getInitMode() == "k-means||"

kmeans.clear(kmeans.maxIter)
assert kmeans.getMaxIter() == 20
Expand Down

0 comments on commit ced8bed

Please sign in to comment.