Skip to content

Commit

Permalink
test(components): Added integration test for Sagemaker TrainingJob co…
Browse files Browse the repository at this point in the history
…mponent v2. (kubeflow#8247)

* Integration tests for sagemaker training v2

* removed redundant check

* removed redundant print

* added safety check

* pr changes

* updated python and kubernetes

* reverting dependency versions

* Revert "updated python and kubernetes"

This reverts commit e92034d.

* added linting
  • Loading branch information
ananth102 authored and jlyaoyuli committed Jan 5, 2023
1 parent e79af1b commit 04f7cef
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 4 deletions.
12 changes: 11 additions & 1 deletion components/aws/sagemaker/tests/integration_tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ RUN apt-get update --allow-releaseinfo-change && apt-get install -y --no-install
curl \
wget \
git \
jq
jq \
tar \
sudo

# Install eksctl
RUN curl --location "https://github.com/weaveworks/eksctl/releases/download/v0.86.0/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp \
Expand All @@ -23,6 +25,14 @@ RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/b
RUN curl -sSL -o /usr/local/bin/argo https://github.com/argoproj/argo-workflows/releases/download/v2.8.0/argo-linux-amd64 \
&& chmod +x /usr/local/bin/argo

#Install helm(for ack)
RUN wget https://get.helm.sh/helm-v3.8.2-linux-amd64.tar.gz \
&& tar xvf helm-v3.8.2-linux-amd64.tar.gz && sudo mv linux-amd64/helm /usr/local/bin

#Install yq for ack
RUN wget https://github.com/mikefarah/yq/releases/download/v4.27.3/yq_linux_amd64 -O /usr/bin/yq &&\
chmod +x /usr/bin/yq

# Copy conda environment early to avoid cache busting
COPY ./components/aws/sagemaker/tests/integration_tests/environment.yml environment.yml

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import pytest
import os
import utils
from utils import kfp_client_utils
from utils import minio_utils
from utils import ack_utils
import ast


@pytest.mark.parametrize(
"test_file_dir",
[pytest.param("resources/config/ack-training-job", marks=pytest.mark.canary_test)],
)
def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "ack-training-job.yaml"),
)
)
input_job_name = utils.generate_random_string(10) + "-trn-job"
test_params["Arguments"]["training_job_name"] = input_job_name

_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)
outputs = {
"sagemaker-trainingjob": [
"model_artifacts",
]
}

# Get output data
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)
model_artifact = utils.read_from_file_in_tar(
output_files["sagemaker-trainingjob"]["model_artifacts"]
)

# Verify Training job was successful on SageMaker
print(f"training job name: {input_job_name}")
train_response = ack_utils.describe_training_job(k8s_client, input_job_name)
assert train_response["status"]["trainingJobStatus"] == "Completed"

# Verify model artifacts output was generated from this run
model_uri = ast.literal_eval(model_artifact)["s3ModelArtifacts"]
print(f"model_artifact_url: {model_uri}")
assert model_uri == train_response["status"]["modelArtifacts"]["s3ModelArtifacts"]
assert input_job_name in model_uri

utils.remove_dir(download_dir)


def test_terminate_trainingjob(kfp_client, experiment_id):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated_terminate"))

test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "woof.yaml"),
)
)
input_job_name = utils.generate_random_string(4) + "-terminate-job"
test_params["Arguments"]["training_job_name"] = input_job_name

run_id, _, _ = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
60,
"running",
)
print(f"Terminating run: {run_id} where Training job_name: {input_job_name}")
kfp_client_utils.terminate_run(kfp_client, run_id)
desiredStatuses = ["Stopping", "Stopped"]
training_status_reached = ack_utils.wait_for_trainingjob_status(
k8s_client, input_job_name, desiredStatuses, 10, 6
)
assert training_status_reached

utils.remove_dir(download_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ def kfp_client():
kfp_installed_namespace = utils.get_kfp_namespace()
return kfp.Client(namespace=kfp_installed_namespace)


def get_experiment_id(kfp_client):
exp_name = datetime.now().strftime("%Y-%m-%d-%H-%M")
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
PipelineDefinition: resources/definition/trainingv2_pipeline.py
TestName: ack-training-job
Timeout: 600
ExpectedTrainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
Arguments:
region: ((REGION))
algorithm_specification:
trainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
trainingInputMode: File
hyper_parameters:
k: "10"
feature_dim: "784"
input_data_config:
- channelName: train
dataSource:
s3DataSource:
s3URI: s3://((DATA_BUCKET))/mnist_kmeans_example/data
s3DataType: S3Prefix
s3DataDistributionType: FullyReplicated
compressionType: None
recordWrapperType: None
inputMode: File
resource_config:
instanceCount: 1
instanceType: ml.m4.xlarge
volumeSizeInGB: 20
stopping_condition:
maxRuntimeInSeconds: 3600
output_data_config:
s3OutputPath: s3://((DATA_BUCKET))/mnist_kmeans_example/output
enable_network_isolation: "True"
enable_managed_spot_training: "False"
enable_inter_container_traffic_encryption: "False"
role_arn: ((SAGEMAKER_ROLE_ARN))
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import kfp
from kfp import components
from kfp import dsl

sagemaker_TrainingJob_op = components.load_component_from_file("../../TrainingJob/component.yaml")

@dsl.pipeline(name="TrainingJob", description="SageMaker TrainingJob component")
def TrainingJob(
region="",
algorithm_specification="",
enable_inter_container_traffic_encryption="",
enable_managed_spot_training="",
enable_network_isolation="",
hyper_parameters="",
input_data_config="",
output_data_config="",
resource_config="",
role_arn="",
stopping_condition="",
training_job_name="",
):
TrainingJob = sagemaker_TrainingJob_op(
region=region,
algorithm_specification=algorithm_specification,
enable_inter_container_traffic_encryption=enable_inter_container_traffic_encryption,
enable_managed_spot_training=enable_managed_spot_training,
enable_network_isolation=enable_network_isolation,
hyper_parameters=hyper_parameters,
input_data_config=input_data_config,
output_data_config=output_data_config,
resource_config=resource_config,
role_arn=role_arn,
stopping_condition=stopping_condition,
training_job_name=training_job_name,
)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(
TrainingJob, "SageMaker_trainingJob_pipeline" + ".yaml"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: ack-sagemaker-controller-rolebinding
namespace: kubeflow
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: ack-sagemaker-controller
subjects:
- kind: ServiceAccount
name: pipeline-runner
namespace: kubeflow
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ printf '{
"Condition": {
"StringEquals": {
"oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':aud": "sts.amazonaws.com",
"oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':sub": "system:serviceaccount:'"${service_namespace}"':'"${service_account}"'"
"oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':sub":[
"system:serviceaccount:'"${service_namespace}"':'"${service_account}"'",
"system:serviceaccount:ack-system:ack-sagemaker-controller"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ ROBOMAKER_EXECUTION_ROLE_ARN=${ROBOMAKER_EXECUTION_ROLE_ARN:-""}

SKIP_FSX_TESTS=${SKIP_FSX_TESTS:-"false"}

ACK_RELEASE_VERSION=${ACK_RELEASE_VERSION:-"v0.4.3"}
HELM_EXPERIMENTAL_OCI=1
SERVICE=sagemaker
CHART_EXPORT_PATH=/tmp/chart
CHART_REF=sagemaker-chart

while getopts ":n:r:s:" opt; do
case $opt in
n)
Expand Down Expand Up @@ -83,6 +89,10 @@ function cleanup() {
set +e

cleanup_kfp
# If installation fails before ack installation resources should be freed.
if [[ -v ACK_K8S_NAMESPACE ]]; then
uninstall_ack
fi
delete_assumed_role

[ "${SKIP_KFP_OIDC_SETUP}" == "false" ] && delete_oidc_role
Expand Down Expand Up @@ -164,6 +174,35 @@ function install_kfp() {
echo "[Installing KFP] Pipeline pods are ready"
}

function install_ack(){

local CHART_REPO=public.ecr.aws/aws-controllers-k8s/${CHART_REF}
local CHART_PACKAGE=${CHART_REF}-${ACK_RELEASE_VERSION}.tgz
ACK_K8S_NAMESPACE=ack-system

mkdir -p ${CHART_EXPORT_PATH}

helm pull oci://${CHART_REPO} --version ${ACK_RELEASE_VERSION} -d ${CHART_EXPORT_PATH}
tar xvf ${CHART_EXPORT_PATH}/${CHART_PACKAGE} -C ${CHART_EXPORT_PATH}

export OIDC_ROLE_ARN
cd ${CHART_EXPORT_PATH}/${SERVICE}-chart
yq e '.aws.region = env(REGION)' -i values.yaml
yq e '.serviceAccount.annotations."eks.amazonaws.com/role-arn" = strenv(OIDC_ROLE_ARN)' -i values.yaml
cd -

kubectl apply -f ${CHART_EXPORT_PATH}/${SERVICE}-chart/crds
helm install -n ${ACK_K8S_NAMESPACE} --create-namespace --skip-crds ack-${SERVICE}-controller \
${CHART_EXPORT_PATH}/${SERVICE}-chart
kubectl apply -f ack-rbac.yaml
}

function uninstall_ack(){
kubectl delete trainingjob --all -n ${KFP_NAMESPACE}
kubectl delete -f ${CHART_EXPORT_PATH}/${SERVICE}-chart/crds
kubectl delete namespace $ACK_K8S_NAMESPACE
}

function generate_oidc_role_name() {
OIDC_ROLE_NAME="$(echo "${DEPLOY_NAME}-kubeflow-role" | cut -c1-64)"
OIDC_ROLE_ARN="arn:aws:iam::${AWS_ACCOUNT_ID}:role/${OIDC_ROLE_NAME}"
Expand Down Expand Up @@ -261,6 +300,7 @@ fi
install_kfp
[ "${SKIP_KFP_OIDC_SETUP}" == "false" ] && install_oidc_role
generate_assumed_role
install_ack

pytest_args=( --region "${REGION}" --sagemaker-role-arn "${SAGEMAKER_EXECUTION_ROLE_ARN}" \
--s3-data-bucket "${S3_DATA_BUCKET}" --kfp-namespace "${KFP_NAMESPACE}" \
Expand All @@ -279,4 +319,4 @@ fi

DIR_THIS_FILE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"

cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n 10
cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n 9
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from time import sleep
from kubernetes import client,config
import os

def k8s_client():
return config.new_client_from_config()

def _get_resource(k8s_client,job_name,kvars):
"""Get the custom resource detail similar to: kubectl describe <resource> JOB_NAME -n NAMESPACE.
Returns:
None or object: None if the resource doesnt exist in server, otherwise the
custom object.
"""
_api = client.CustomObjectsApi(k8s_client)
namespace = os.environ.get("NAMESPACE")
job_description = _api.get_namespaced_custom_object(
kvars["group"].lower(),
kvars["version"].lower(),
namespace.lower(),
kvars["plural"].lower(),
job_name.lower()
)
return job_description

def describe_training_job(k8s_client,training_job_name):
training_vars = {
"group":"sagemaker.services.k8s.aws",
"version":"v1alpha1",
"plural":"trainingjobs",
}
return _get_resource(k8s_client,training_job_name,training_vars)

#TODO: Make this a generalized function for non-job resources.
def wait_for_trainingjob_status(k8s_client,training_job_name,desiredStatuses,wait_periods,period_length):
for _ in range(wait_periods):
response = describe_training_job(k8s_client,training_job_name)
if response["status"]["trainingJobStatus"] in desiredStatuses:return True
sleep(period_length)
return False

0 comments on commit 04f7cef

Please sign in to comment.