Skip to content

Commit

Permalink
# This is a combination of 5 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

parent 1551e34
author Dustin Luong <[email protected]> 1593625846 -0700
committer Dustin Luong <[email protected]> 1595961236 -0700

Unit test passes

Updated unit tests, working on integration tests

Updated READMEs, Logged rules status and errors to UI

    squash a94f809 Unfinished sample pipeline for debugger
    squash 8128572 Edge case: empty rules config

Edge case: empty rules config

Cleaned up example pipeline and fixed empty case of debug rules

Finished integration test, debug rule logs only print after training has completed, refactored various code

Unhardcoded rules registry

New sample pipeline, minor changes to utils

Refactored wait_for_debug_rules, added unit tests, updated readme for debugger demo, fixed typos and small errors

rm .gz

Changed defaults for train.template.yaml, updated example pipeline, removed exceptions from utils which are handled by boto3

removed trust.json

minor clean ups

Minor fixes

Refactored code to incorporate changes from design review, notably removing collection_config

# This is the commit message kubeflow#2:

custom_rules.py

# This is the commit message kubeflow#3:

Added tensorboard

# This is the commit message kubeflow#4:

restored run_integration_tests

# This is the commit message kubeflow#5:

removed custom rule
  • Loading branch information
dstnluong committed Jul 28, 2020
1 parent 1551e34 commit 50325db
Show file tree
Hide file tree
Showing 20 changed files with 110 additions and 445 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ The meeting is happening every other Wed 10-11AM (PST)
* How to create and deploy a Kubeflow Machine Learning Pipeline (By Lak Lakshmanan)
* [Part 1: How to create and deploy a Kubeflow Machine Learning Pipeline](https://towardsdatascience.com/how-to-create-and-deploy-a-kubeflow-machine-learning-pipeline-part-1-efea7a4b650f)
* [Part 2: How to deploy Jupyter notebooks as components of a Kubeflow ML pipeline](https://towardsdatascience.com/how-to-deploy-jupyter-notebooks-as-components-of-a-kubeflow-ml-pipeline-part-2-b1df77f4e5b3)
* [Part 3: How to carry out CI/CD in Machine Learning (“MLOps”) using Kubeflow ML pipelines](https://medium.com/google-cloud/how-to-carry-out-ci-cd-in-machine-learning-mlops-using-kubeflow-ml-pipelines-part-3-bdaf68082112)

## Acknowledgments

Expand Down
52 changes: 28 additions & 24 deletions components/aws/sagemaker/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,21 @@ def create_training_job_request(args):

enable_spot_instance_support(request, args)

### Update DebuggerHookConfig, CollectionConfigurations, and DebugRuleConfigurations
### Update DebugHookConfig and DebugRuleConfigurations
if args['debug_hook_config']:
if 'CollectionConfigurations' in args['debug_hook_config']:
logging.into('Existing CollectionConfigurations in debug_hook_config will be overwritten. Move and reformat into collection_config parameter')
raise Exception('Could not create job request')
request['DebugHookConfig'] = args['debug_hook_config']
request['DebugHookConfig']['CollectionConfigurations'] = []

if args['collection_config']:
for key, val in args['collection_config'].items():
request['DebugHookConfig']['CollectionConfigurations'].append({"CollectionName": key, "CollectionParameters": val})

if not args['debug_hook_config'] and not args['collection_config']:
else:
request.pop('DebugHookConfig')

if args['debug_rule_config']:
request['DebugRuleConfigurations'] = args['debug_rule_config']
else:
request.pop('DebugRuleConfigurations')

if args['debug_rule_config']:
request['DebugRuleConfigurations'] = args['debug_rule_config']
if args['tensorboard_output_config']:
request['TensorBoardOutputConfig'] = args['tensorboard_output_config']
else:
request.pop('TensorBoardOutputConfig')

### Update tags
for key, val in args['tags'].items():
Expand Down Expand Up @@ -267,18 +260,18 @@ def wait_for_training_job(client, training_job_name, poll_interval=31):


def wait_for_debug_rules(client, training_job_name, poll_interval=31):
first_poll = True
while(True):
response = client.describe_training_job(TrainingJobName=training_job_name)
if 'DebugRuleEvaluationStatuses' not in response:
break

if first_poll:
logging.info("Polling for status of all debug rules:")
first_poll = False
if debug_rules_completed(response):
logging.info("Rules have ended with status: ")
logging.info("Rules have ended with status:\n")
print_debug_rule_status(response, True)
if debug_rules_errored(response):
raise Exception('One or more debug rules have errored.')
break
logging.info('Debugger Rule Status:')
print_debug_rule_status(response)
time.sleep(poll_interval)

Expand All @@ -299,13 +292,24 @@ def debug_rules_completed(response):
return True


def print_debug_rule_status(response, verbose=False):
def print_debug_rule_status(response, last_print=False):
for debug_rule in response['DebugRuleEvaluationStatuses']:
logging.info(" - {}: {}".format(debug_rule['RuleConfigurationName'], debug_rule['RuleEvaluationStatus']))
if verbose and 'StatusDetails' in debug_rule:
logging.info(" - {}".format(debug_rule['StatusDetails']).rstrip())


line_ending = "\n" if last_print else ""
if 'StatusDetails' in debug_rule:
status_details = "- {}{}".format(debug_rule['StatusDetails'].rstrip(), line_ending)
line_ending = ""
else:
status_details = ""
rule_status = "- {}: {}{}".format(debug_rule['RuleConfigurationName'], debug_rule['RuleEvaluationStatus'], line_ending)
if debug_rule['RuleEvaluationStatus'] == "Error":
logging.error("{}".format(rule_status))
if last_print and status_details:
logging.error(" {}".format(status_details))
else:
logging.info(" {}".format(rule_status))
if last_print and status_details:
logging.info(" {}".format(status_details))
logging.info(50 * "-")

def get_model_artifacts_from_job(client, job_name):
info = client.describe_training_job(TrainingJobName=job_name)
Expand Down
6 changes: 3 additions & 3 deletions components/aws/sagemaker/common/train.template.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

TrainingJobName: ''
HyperParameters: {}
AlgorithmSpecification:
TrainingImage: ''
Expand All @@ -21,9 +21,9 @@ VpcConfig:
StoppingCondition:
MaxRuntimeInSeconds: 86400
MaxWaitTimeInSeconds: 86400
DebugHookConfig:
CollectionConfigurations: []
DebugHookConfig: {}
DebugRuleConfigurations: []
TensorBoardOutputConfig: {}
CheckpointConfig:
S3Uri: ''
LocalPath: ''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
pytest.param(
"resources/config/simple-mnist-training", marks=pytest.mark.canary_test
),
pytest.param(
"resources/config/fsx-mnist-training",
marks=pytest.mark.fsx_test
),
pytest.param(
"resources/config/xgboost-mnist-trainingjob-debugger",
marks=pytest.mark.canary_test
),
pytest.param(
"resources/config/fsx-mnist-training",
marks=pytest.mark.fsx_test
),
"resources/config/spot-sample-pipeline-training",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"test_file_dir",
[
pytest.param(
"resources/config/create-workteam",
"resources/config/create-workteam",
marks=pytest.mark.canary_test
)
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ Arguments:
network_isolation: "True"
traffic_encryption: "False"
spot_instance: "False"
max_wait_time: 3600
max_wait_time: 3600
checkpoint_config: "{}"
debug_hook_config:
S3OutputPath: s3://((DATA_BUCKET))/xgboost-debugger/hookconfig
collection_config:
feature_importance:
save_interval: "5"
losses:
save_interval: "500"
average_shap:
save_interval: "5"
metrics:
save_interval: "5"
CollectionConfigurations:
- CollectionName: "feature_importance"
CollectionParameters:
save_interval: "5"
- CollectionName: "losses"
CollectionParameters:
save_interval: "500"
- CollectionName: "average_shap"
CollectionParameters:
save_interval: "5"
- CollectionName: "metrics"
CollectionParameters:
save_interval: "5"
debug_rule_config:
- RuleConfigurationName: LossNotDecreasing
RuleEvaluatorImage: ((BUILTIN_RULE_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/sagemaker-debugger-rules:latest
Expand All @@ -52,3 +56,4 @@ Arguments:
num_steps: "10"
rule_to_invoke: LossNotDecreasing
role: ((ROLE_ARN))

Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def training_pipeline(
max_wait_time="",
checkpoint_config="",
debug_hook_config="",
collection_config="",
debug_rule_config="",
tensorboard_output_config="",
role="",
):
sagemaker_train_op(
Expand All @@ -54,8 +54,8 @@ def training_pipeline(
max_wait_time=max_wait_time,
checkpoint_config=checkpoint_config,
debug_hook_config=debug_hook_config,
collection_config=collection_config,
debug_rule_config=debug_rule_config,
tensorboard_output_config=tensorboard_output_config,
role=role,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ function cleanup() {
set +e

cleanup_kfp
delete_generated_role

if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
delete_fsx_instance
Expand All @@ -88,7 +89,6 @@ function cleanup() {

if [[ -z "${EKS_EXISTING_CLUSTER}" ]]; then
delete_eks
delete_generated_role
fi
}

Expand Down Expand Up @@ -206,4 +206,4 @@ fi

DIR_THIS_FILE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"

cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n $(nproc)
cd $DIR_THIS_FILE/../ && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n $(nproc)
102 changes: 45 additions & 57 deletions components/aws/sagemaker/tests/unit_tests/tests/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def test_create_training_job(self):
mock_client = MagicMock()
mock_args = self.parser.parse_args(required_args + ['--job_name', 'test-job'])
response = _utils.create_training_job(mock_client, vars(mock_args))
print(response)

mock_client.create_training_job.assert_called_once_with(
AlgorithmSpecification={'TrainingImage': 'test-image', 'TrainingInputMode': 'File'},
Expand Down Expand Up @@ -158,16 +157,13 @@ def test_wait_for_errored_rule(self):
mock_client = MagicMock()
mock_client.describe_training_job.side_effect = [
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "InProgress"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "InProgress"}]},
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "NoIssuesFound"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "InProgress"}]},
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "NoIssuesFound"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "Error"}]},
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "Error"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "InProgress"}]},
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "Error"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "NoIssuesFound"}]},
{"DebugRuleEvaluationStatuses": [{"RuleConfigurationName": "rule1", "RuleEvaluationStatus": "Should not be called"}, {"RuleConfigurationName": "rule2", "RuleEvaluationStatus": "Should not be called"}]},
]
with self.assertRaises(Exception):
_utils.wait_for_debug_rules(mock_client, 'training-job', 0)
_utils.wait_for_debug_rules(mock_client, 'training-job', 0)
self.assertEqual(mock_client.describe_training_job.call_count, 3)



def test_get_model_artifacts_from_job(self):
mock_client = MagicMock()
mock_client.describe_training_job.return_value = {"ModelArtifacts": {"S3ModelArtifacts": "s3://path/"}}
Expand Down Expand Up @@ -199,7 +195,6 @@ def test_reasonable_required_args(self):
self.assertEqual(response['AlgorithmSpecification']['TrainingInputMode'], 'File')
self.assertEqual(response['OutputDataConfig']['S3OutputPath'], 'test-path')


def test_metric_definitions(self):
metric_definition_args = self.parser.parse_args(required_args + ['--metric_definitions', '{"metric1": "regexval1", "metric2": "regexval2"}'])
response = _utils.create_training_job_request(vars(metric_definition_args))
Expand All @@ -215,26 +210,6 @@ def test_metric_definitions(self):
'Regex': "regexval2"
}])

def test_collection_config(self):
debug_hook_args = ['--debug_hook_config', '{"S3OutputPath":"s3://fake-uri/"}']
collection_definition_args = self.parser.parse_args(required_args + debug_hook_args + ['--collection_config', '{"collection1": {"key1": "value1"}, "collection2": {"key2": "value2", "key3": "value3"}}'])
response = _utils.create_training_job_request(vars(collection_definition_args))

self.assertIn('CollectionConfigurations', response['DebugHookConfig'])
response_collection_configurations = response['DebugHookConfig']['CollectionConfigurations']
self.assertEqual(response_collection_configurations, [{
'CollectionName': "collection1",
"CollectionParameters": {
"key1": "value1"
}
}, {
'CollectionName': "collection2",
'CollectionParameters': {
"key2": "value2",
"key3": "value3"
}
}])

def test_no_defined_image(self):
# Pass the image to pass the parser
no_image_args = required_args.copy()
Expand Down Expand Up @@ -373,36 +348,49 @@ def test_spot_bad_args(self):
with self.assertRaises(Exception):
_utils.create_training_job_request(vars(arg))

def test_hook_good_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_hook_config', '{"S3OutputPath": "s3://fake-uri/", "LocalPath": "/local/path/", "HookParameters": {"key": "value"}}', '--collection_config', '{"collection1": {"key1": "value1"}}'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugHookConfig']['S3OutputPath'], "s3://fake-uri/")
self.assertEqual(response['DebugHookConfig']['LocalPath'], "/local/path/")
self.assertEqual(response['DebugHookConfig']['HookParameters'], {"key": "value"})
self.assertEqual(response['DebugHookConfig']['CollectionConfigurations'], [{
"CollectionName": "collection1",
def test_hook_min_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_hook_config', '{"S3OutputPath": "s3://fake-uri/"}'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugHookConfig']['S3OutputPath'], "s3://fake-uri/")

def test_hook_max_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_hook_config', '{"S3OutputPath": "s3://fake-uri/", "LocalPath": "/local/path/", "HookParameters": {"key": "value"}, "CollectionConfigurations": [{"CollectionName": "collection1", "CollectionParameters": {"key1": "value1"}}, {"CollectionName": "collection2", "CollectionParameters": {"key2": "value2", "key3": "value3"}}]}'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugHookConfig']['S3OutputPath'], "s3://fake-uri/")
self.assertEqual(response['DebugHookConfig']['LocalPath'], "/local/path/")
self.assertEqual(response['DebugHookConfig']['HookParameters'], {"key": "value"})
self.assertEqual(response['DebugHookConfig']['CollectionConfigurations'], [
{
"CollectionName": "collection1",
"CollectionParameters": {
"key1": "value1"
}
}, {
"CollectionName": "collection2",
"CollectionParameters": {
"key1": "value1"
"key2": "value2",
"key3": "value3"
}
}])

def test_hook_bad_args(self):
config_in_hook_args = self.parser.parse_args(required_args + ['--debug_hook_config', '{"S3OutputPath": "s3://fake-uri/", "CollectionConfigurations": [{"CollectionName": "collection1", "CollectionParameters": {"key1": "value1"}}]}'])

for arg in [config_in_hook_args]:
with self.assertRaises(Exception):
_utils.create_training_job_request(vars(arg))

def test_rule_good_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_rule_config', '[{"InstanceType": "ml.m4.xlarge", "LocalPath": "/local/path/", "RuleConfigurationName": "rule_name", "RuleEvaluatorImage": "test-image", "RuleParameters": {"key1": "value1"}, "S3OutputPath": "s3://fake-uri/", "VolumeSizeInGB": 1}]'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugRuleConfigurations'][0]['InstanceType'], 'ml.m4.xlarge')
self.assertEqual(response['DebugRuleConfigurations'][0]['LocalPath'], '/local/path/')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleConfigurationName'], 'rule_name')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleEvaluatorImage'], 'test-image')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleParameters'], {"key1": "value1"})
self.assertEqual(response['DebugRuleConfigurations'][0]['S3OutputPath'], 's3://fake-uri/')
self.assertEqual(response['DebugRuleConfigurations'][0]['VolumeSizeInGB'], 1)
}
])

def test_rule_max_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_rule_config', '[{"InstanceType": "ml.m4.xlarge", "LocalPath": "/local/path/", "RuleConfigurationName": "rule_name", "RuleEvaluatorImage": "test-image", "RuleParameters": {"key1": "value1"}, "S3OutputPath": "s3://fake-uri/", "VolumeSizeInGB": 1}]'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugRuleConfigurations'][0]['InstanceType'], 'ml.m4.xlarge')
self.assertEqual(response['DebugRuleConfigurations'][0]['LocalPath'], '/local/path/')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleConfigurationName'], 'rule_name')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleEvaluatorImage'], 'test-image')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleParameters'], {"key1": "value1"})
self.assertEqual(response['DebugRuleConfigurations'][0]['S3OutputPath'], 's3://fake-uri/')
self.assertEqual(response['DebugRuleConfigurations'][0]['VolumeSizeInGB'], 1)

def test_rule_min_good_args(self):
good_args = self.parser.parse_args(required_args + ['--debug_rule_config', '[{"RuleConfigurationName": "rule_name", "RuleEvaluatorImage": "test-image"}]'])
response = _utils.create_training_job_request(vars(good_args))
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleConfigurationName'], 'rule_name')
self.assertEqual(response['DebugRuleConfigurations'][0]['RuleEvaluatorImage'], 'test-image')

def test_spot_lesser_wait_time(self):
args = self.parser.parse_args(required_args + ['--spot_instance', 'True', '--max_wait_time', '3599', '--checkpoint_config', '{"S3Uri": "s3://fake-uri/", "LocalPath": "local-path"}'])
with self.assertRaises(Exception):
Expand All @@ -419,7 +407,7 @@ def test_spot_local_path(self):
args = self.parser.parse_args(required_args + ['--spot_instance', 'True', '--max_wait_time', '3600', '--checkpoint_config', '{"S3Uri": "s3://fake-uri/", "LocalPath": "local-path"}'])
response = _utils.create_training_job_request(vars(args))
self.assertEqual(response['CheckpointConfig']['S3Uri'], 's3://fake-uri/')
self.assertEqual(response['CheckpointConfig']['LocalPath'], 'local-path')
self.assertEqual(response['CheckpointConfig']['LocalPath'], 'local-path')

def test_tags(self):
args = self.parser.parse_args(required_args + ['--tags', '{"key1": "val1", "key2": "val2"}'])
Expand Down
2 changes: 1 addition & 1 deletion components/aws/sagemaker/train/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spot_instance | Use managed spot training if true | No | Boolean | False, True |
max_wait_time | The maximum time in seconds you are willing to wait for a managed spot training job to complete | Yes | Int | ≤ 432000 (5 days) | 86400 (1 day) |
checkpoint_config | Dictionary of information about the output location for managed spot training checkpoint data | Yes | Dict | | {} |
debug_hook_config | Dictionary of configuration information for the debug hook parameters, collection configurations, and storage paths | Yes | Dict | | {} |
collection_config | Dictionary of configuration information for tensor collections | Yes | Dict | | {} |
collection_config | Dictionary of configuration information for tensor collections | Yes | Dict | | {} |
debug_rule_config | List of Configuration information for debugging rules. | Yes | List of Dicts | | [] |
tags | Key-value pairs to categorize AWS resources | Yes | Dict | | {} |

Expand Down
Loading

0 comments on commit 50325db

Please sign in to comment.