From 04f7ceffbd478a6842ff9606c5b49eff44904a3e Mon Sep 17 00:00:00 2001 From: ananth102 Date: Wed, 14 Sep 2022 17:42:04 -0700 Subject: [PATCH] test(components): Added integration test for Sagemaker TrainingJob component v2. (#8247) * Integration tests for sagemaker training v2 * removed redundant check * removed redundant print * added safety check * pr changes * updated python and kubernetes * reverting dependency versions * Revert "updated python and kubernetes" This reverts commit e92034d5f9c2761fedbd2985052bc2c60d8390bf. * added linting --- .../tests/integration_tests/Dockerfile | 12 ++- .../test_v2_train_component.py | 96 +++++++++++++++++++ .../tests/integration_tests/conftest.py | 1 - .../config/ack-training-job/config.yaml | 34 +++++++ .../definition/trainingv2_pipeline.py | 39 ++++++++ .../integration_tests/scripts/ack-rbac.yaml | 13 +++ .../scripts/generate_trust_policy | 5 +- .../scripts/run_integration_tests | 42 +++++++- .../integration_tests/utils/ack_utils.py | 39 ++++++++ 9 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_train_component.py create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/config/ack-training-job/config.yaml create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/definition/trainingv2_pipeline.py create mode 100644 components/aws/sagemaker/tests/integration_tests/scripts/ack-rbac.yaml create mode 100644 components/aws/sagemaker/tests/integration_tests/utils/ack_utils.py diff --git a/components/aws/sagemaker/tests/integration_tests/Dockerfile b/components/aws/sagemaker/tests/integration_tests/Dockerfile index b323857a75c..45b0fcf5c40 100644 --- a/components/aws/sagemaker/tests/integration_tests/Dockerfile +++ b/components/aws/sagemaker/tests/integration_tests/Dockerfile @@ -4,7 +4,9 @@ RUN apt-get update --allow-releaseinfo-change && apt-get install -y --no-install curl \ wget \ git \ - jq + jq \ + tar \ + sudo # Install eksctl RUN curl --location "https://github.com/weaveworks/eksctl/releases/download/v0.86.0/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp \ @@ -23,6 +25,14 @@ RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/b RUN curl -sSL -o /usr/local/bin/argo https://github.com/argoproj/argo-workflows/releases/download/v2.8.0/argo-linux-amd64 \ && chmod +x /usr/local/bin/argo +#Install helm(for ack) +RUN wget https://get.helm.sh/helm-v3.8.2-linux-amd64.tar.gz \ + && tar xvf helm-v3.8.2-linux-amd64.tar.gz && sudo mv linux-amd64/helm /usr/local/bin + +#Install yq for ack +RUN wget https://github.com/mikefarah/yq/releases/download/v4.27.3/yq_linux_amd64 -O /usr/bin/yq &&\ + chmod +x /usr/bin/yq + # Copy conda environment early to avoid cache busting COPY ./components/aws/sagemaker/tests/integration_tests/environment.yml environment.yml diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_train_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_train_component.py new file mode 100644 index 00000000000..fd9196622f9 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_train_component.py @@ -0,0 +1,96 @@ +import pytest +import os +import utils +from utils import kfp_client_utils +from utils import minio_utils +from utils import ack_utils +import ast + + +@pytest.mark.parametrize( + "test_file_dir", + [pytest.param("resources/config/ack-training-job", marks=pytest.mark.canary_test)], +) +def test_trainingjobV2(kfp_client, experiment_id, test_file_dir): + k8s_client = ack_utils.k8s_client() + test_file_dir = "resources/config/ack-training-job" + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "ack-training-job.yaml"), + ) + ) + input_job_name = utils.generate_random_string(10) + "-trn-job" + test_params["Arguments"]["training_job_name"] = input_job_name + + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + test_params["Timeout"], + ) + outputs = { + "sagemaker-trainingjob": [ + "model_artifacts", + ] + } + + # Get output data + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + model_artifact = utils.read_from_file_in_tar( + output_files["sagemaker-trainingjob"]["model_artifacts"] + ) + + # Verify Training job was successful on SageMaker + print(f"training job name: {input_job_name}") + train_response = ack_utils.describe_training_job(k8s_client, input_job_name) + assert train_response["status"]["trainingJobStatus"] == "Completed" + + # Verify model artifacts output was generated from this run + model_uri = ast.literal_eval(model_artifact)["s3ModelArtifacts"] + print(f"model_artifact_url: {model_uri}") + assert model_uri == train_response["status"]["modelArtifacts"]["s3ModelArtifacts"] + assert input_job_name in model_uri + + utils.remove_dir(download_dir) + + +def test_terminate_trainingjob(kfp_client, experiment_id): + k8s_client = ack_utils.k8s_client() + test_file_dir = "resources/config/ack-training-job" + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated_terminate")) + + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "woof.yaml"), + ) + ) + input_job_name = utils.generate_random_string(4) + "-terminate-job" + test_params["Arguments"]["training_job_name"] = input_job_name + + run_id, _, _ = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + 60, + "running", + ) + print(f"Terminating run: {run_id} where Training job_name: {input_job_name}") + kfp_client_utils.terminate_run(kfp_client, run_id) + desiredStatuses = ["Stopping", "Stopped"] + training_status_reached = ack_utils.wait_for_trainingjob_status( + k8s_client, input_job_name, desiredStatuses, 10, 6 + ) + assert training_status_reached + + utils.remove_dir(download_dir) diff --git a/components/aws/sagemaker/tests/integration_tests/conftest.py b/components/aws/sagemaker/tests/integration_tests/conftest.py index 2adf0c843e7..2a14d015cb4 100644 --- a/components/aws/sagemaker/tests/integration_tests/conftest.py +++ b/components/aws/sagemaker/tests/integration_tests/conftest.py @@ -148,7 +148,6 @@ def kfp_client(): kfp_installed_namespace = utils.get_kfp_namespace() return kfp.Client(namespace=kfp_installed_namespace) - def get_experiment_id(kfp_client): exp_name = datetime.now().strftime("%Y-%m-%d-%H-%M") try: diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/ack-training-job/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/ack-training-job/config.yaml new file mode 100644 index 00000000000..060a66b520c --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/ack-training-job/config.yaml @@ -0,0 +1,34 @@ +PipelineDefinition: resources/definition/trainingv2_pipeline.py +TestName: ack-training-job +Timeout: 600 +ExpectedTrainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1 +Arguments: + region: ((REGION)) + algorithm_specification: + trainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1 + trainingInputMode: File + hyper_parameters: + k: "10" + feature_dim: "784" + input_data_config: + - channelName: train + dataSource: + s3DataSource: + s3URI: s3://((DATA_BUCKET))/mnist_kmeans_example/data + s3DataType: S3Prefix + s3DataDistributionType: FullyReplicated + compressionType: None + recordWrapperType: None + inputMode: File + resource_config: + instanceCount: 1 + instanceType: ml.m4.xlarge + volumeSizeInGB: 20 + stopping_condition: + maxRuntimeInSeconds: 3600 + output_data_config: + s3OutputPath: s3://((DATA_BUCKET))/mnist_kmeans_example/output + enable_network_isolation: "True" + enable_managed_spot_training: "False" + enable_inter_container_traffic_encryption: "False" + role_arn: ((SAGEMAKER_ROLE_ARN)) \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/resources/definition/trainingv2_pipeline.py b/components/aws/sagemaker/tests/integration_tests/resources/definition/trainingv2_pipeline.py new file mode 100644 index 00000000000..6e1814b67e7 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/definition/trainingv2_pipeline.py @@ -0,0 +1,39 @@ +import kfp +from kfp import components +from kfp import dsl + +sagemaker_TrainingJob_op = components.load_component_from_file("../../TrainingJob/component.yaml") + +@dsl.pipeline(name="TrainingJob", description="SageMaker TrainingJob component") +def TrainingJob( + region="", + algorithm_specification="", + enable_inter_container_traffic_encryption="", + enable_managed_spot_training="", + enable_network_isolation="", + hyper_parameters="", + input_data_config="", + output_data_config="", + resource_config="", + role_arn="", + stopping_condition="", + training_job_name="", +): + TrainingJob = sagemaker_TrainingJob_op( + region=region, + algorithm_specification=algorithm_specification, + enable_inter_container_traffic_encryption=enable_inter_container_traffic_encryption, + enable_managed_spot_training=enable_managed_spot_training, + enable_network_isolation=enable_network_isolation, + hyper_parameters=hyper_parameters, + input_data_config=input_data_config, + output_data_config=output_data_config, + resource_config=resource_config, + role_arn=role_arn, + stopping_condition=stopping_condition, + training_job_name=training_job_name, + ) +if __name__ == "__main__": + kfp.compiler.Compiler().compile( + TrainingJob, "SageMaker_trainingJob_pipeline" + ".yaml" + ) \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/scripts/ack-rbac.yaml b/components/aws/sagemaker/tests/integration_tests/scripts/ack-rbac.yaml new file mode 100644 index 00000000000..f832fc5f751 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/scripts/ack-rbac.yaml @@ -0,0 +1,13 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: ack-sagemaker-controller-rolebinding + namespace: kubeflow +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: ack-sagemaker-controller +subjects: +- kind: ServiceAccount + name: pipeline-runner + namespace: kubeflow \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/scripts/generate_trust_policy b/components/aws/sagemaker/tests/integration_tests/scripts/generate_trust_policy index 1c10fa10fe8..0303c5be27a 100755 --- a/components/aws/sagemaker/tests/integration_tests/scripts/generate_trust_policy +++ b/components/aws/sagemaker/tests/integration_tests/scripts/generate_trust_policy @@ -30,7 +30,10 @@ printf '{ "Condition": { "StringEquals": { "oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':aud": "sts.amazonaws.com", - "oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':sub": "system:serviceaccount:'"${service_namespace}"':'"${service_account}"'" + "oidc.eks.'"${cluster_region}"'.amazonaws.com/id/'"${oidc_id}"':sub":[ + "system:serviceaccount:'"${service_namespace}"':'"${service_account}"'", + "system:serviceaccount:ack-system:ack-sagemaker-controller" + ] } } } diff --git a/components/aws/sagemaker/tests/integration_tests/scripts/run_integration_tests b/components/aws/sagemaker/tests/integration_tests/scripts/run_integration_tests index e7fc06bce9c..d7ad821b008 100755 --- a/components/aws/sagemaker/tests/integration_tests/scripts/run_integration_tests +++ b/components/aws/sagemaker/tests/integration_tests/scripts/run_integration_tests @@ -38,6 +38,12 @@ ROBOMAKER_EXECUTION_ROLE_ARN=${ROBOMAKER_EXECUTION_ROLE_ARN:-""} SKIP_FSX_TESTS=${SKIP_FSX_TESTS:-"false"} +ACK_RELEASE_VERSION=${ACK_RELEASE_VERSION:-"v0.4.3"} +HELM_EXPERIMENTAL_OCI=1 +SERVICE=sagemaker +CHART_EXPORT_PATH=/tmp/chart +CHART_REF=sagemaker-chart + while getopts ":n:r:s:" opt; do case $opt in n) @@ -83,6 +89,10 @@ function cleanup() { set +e cleanup_kfp + # If installation fails before ack installation resources should be freed. + if [[ -v ACK_K8S_NAMESPACE ]]; then + uninstall_ack + fi delete_assumed_role [ "${SKIP_KFP_OIDC_SETUP}" == "false" ] && delete_oidc_role @@ -164,6 +174,35 @@ function install_kfp() { echo "[Installing KFP] Pipeline pods are ready" } +function install_ack(){ + + local CHART_REPO=public.ecr.aws/aws-controllers-k8s/${CHART_REF} + local CHART_PACKAGE=${CHART_REF}-${ACK_RELEASE_VERSION}.tgz + ACK_K8S_NAMESPACE=ack-system + + mkdir -p ${CHART_EXPORT_PATH} + + helm pull oci://${CHART_REPO} --version ${ACK_RELEASE_VERSION} -d ${CHART_EXPORT_PATH} + tar xvf ${CHART_EXPORT_PATH}/${CHART_PACKAGE} -C ${CHART_EXPORT_PATH} + + export OIDC_ROLE_ARN + cd ${CHART_EXPORT_PATH}/${SERVICE}-chart + yq e '.aws.region = env(REGION)' -i values.yaml + yq e '.serviceAccount.annotations."eks.amazonaws.com/role-arn" = strenv(OIDC_ROLE_ARN)' -i values.yaml + cd - + + kubectl apply -f ${CHART_EXPORT_PATH}/${SERVICE}-chart/crds + helm install -n ${ACK_K8S_NAMESPACE} --create-namespace --skip-crds ack-${SERVICE}-controller \ + ${CHART_EXPORT_PATH}/${SERVICE}-chart + kubectl apply -f ack-rbac.yaml +} + +function uninstall_ack(){ + kubectl delete trainingjob --all -n ${KFP_NAMESPACE} + kubectl delete -f ${CHART_EXPORT_PATH}/${SERVICE}-chart/crds + kubectl delete namespace $ACK_K8S_NAMESPACE +} + function generate_oidc_role_name() { OIDC_ROLE_NAME="$(echo "${DEPLOY_NAME}-kubeflow-role" | cut -c1-64)" OIDC_ROLE_ARN="arn:aws:iam::${AWS_ACCOUNT_ID}:role/${OIDC_ROLE_NAME}" @@ -261,6 +300,7 @@ fi install_kfp [ "${SKIP_KFP_OIDC_SETUP}" == "false" ] && install_oidc_role generate_assumed_role +install_ack pytest_args=( --region "${REGION}" --sagemaker-role-arn "${SAGEMAKER_EXECUTION_ROLE_ARN}" \ --s3-data-bucket "${S3_DATA_BUCKET}" --kfp-namespace "${KFP_NAMESPACE}" \ @@ -279,4 +319,4 @@ fi DIR_THIS_FILE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n 10 +cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n 9 diff --git a/components/aws/sagemaker/tests/integration_tests/utils/ack_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/ack_utils.py new file mode 100644 index 00000000000..dea7f393162 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/ack_utils.py @@ -0,0 +1,39 @@ +from time import sleep +from kubernetes import client,config +import os + +def k8s_client(): + return config.new_client_from_config() + +def _get_resource(k8s_client,job_name,kvars): + """Get the custom resource detail similar to: kubectl describe JOB_NAME -n NAMESPACE. + Returns: + None or object: None if the resource doesnt exist in server, otherwise the + custom object. + """ + _api = client.CustomObjectsApi(k8s_client) + namespace = os.environ.get("NAMESPACE") + job_description = _api.get_namespaced_custom_object( + kvars["group"].lower(), + kvars["version"].lower(), + namespace.lower(), + kvars["plural"].lower(), + job_name.lower() + ) + return job_description + +def describe_training_job(k8s_client,training_job_name): + training_vars = { + "group":"sagemaker.services.k8s.aws", + "version":"v1alpha1", + "plural":"trainingjobs", + } + return _get_resource(k8s_client,training_job_name,training_vars) + +#TODO: Make this a generalized function for non-job resources. +def wait_for_trainingjob_status(k8s_client,training_job_name,desiredStatuses,wait_periods,period_length): + for _ in range(wait_periods): + response = describe_training_job(k8s_client,training_job_name) + if response["status"]["trainingJobStatus"] in desiredStatuses:return True + sleep(period_length) + return False