From 8f33cb085b984bd6ec3550787684b7da61e85347 Mon Sep 17 00:00:00 2001 From: Ben Sully Date: Thu, 21 May 2020 21:41:55 +0100 Subject: [PATCH] Move 'secret_scope' field into inner credentials object to simplify Databricks storage --- .../environments/prod_databricks.yaml | 9 +++--- .../dagster_databricks/configs.py | 30 ++++++++++++++----- .../databricks_step_main.py | 30 +++++++++---------- .../dagster_databricks_tests/test_pyspark.py | 21 +++++++------ 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/examples/dagster_examples/simple_pyspark/environments/prod_databricks.yaml b/examples/dagster_examples/simple_pyspark/environments/prod_databricks.yaml index 821c6f40e7718..c5c3777be3ea1 100644 --- a/examples/dagster_examples/simple_pyspark/environments/prod_databricks.yaml +++ b/examples/dagster_examples/simple_pyspark/environments/prod_databricks.yaml @@ -17,11 +17,10 @@ resources: local_pipeline_package_path: . staging_prefix: /dagster-databricks-tests storage: - credentials: - s3: - access_key_key: aws-access-key - secret_key_key: aws-secret-key - secret_scope: dagster-databricks-tests + s3: + secret_scope: dagster-databricks-tests + access_key_key: aws-access-key + secret_key_key: aws-secret-key solids: make_weather_samples: inputs: diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/configs.py b/python_modules/libraries/dagster-databricks/dagster_databricks/configs.py index 1010b66c39f26..302b6a835f545 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks/configs.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/configs.py @@ -531,6 +531,14 @@ def define_databricks_submit_run_config(): ) +def _define_secret_scope(): + return Field( + String, + description='The Databricks secret scope containing the storage secrets.', + is_required=True, + ) + + def _define_s3_storage_credentials(): access_key_key = Field( String, @@ -543,7 +551,13 @@ def _define_s3_storage_credentials(): is_required=True, ) return Field( - Shape(fields={'access_key_key': access_key_key, 'secret_key_key': secret_key_key}), + Shape( + fields={ + 'secret_scope': _define_secret_scope(), + 'access_key_key': access_key_key, + 'secret_key_key': secret_key_key, + } + ), description='S3 storage secret configuration', ) @@ -562,6 +576,7 @@ def _define_adls2_storage_credentials(): return Field( Shape( fields={ + 'secret_scope': _define_secret_scope(), 'storage_account_name': storage_account_name, 'storage_account_key_key': storage_account_key_key, } @@ -579,13 +594,14 @@ def _define_storage_credentials(): def define_databricks_storage_config(): - secret_scope = Field( - String, - description='The Databricks secret scope containing the storage secrets.', - is_required=True, - ) return Field( - Shape(fields={'secret_scope': secret_scope, 'credentials': _define_storage_credentials()}), + Selector( + { + 's3': _define_s3_storage_credentials() + # TODO: uncomment when dagster-azure is merged. + # 'adls2': _define_adls2_storage_credentials(), + } + ), description='Databricks storage configuration. Solids using the ' 'DatabricksPySparkStepLauncher to execute pipeline steps in Databricks MUST configure ' 'storage using this config (either S3 or ADLS2 can be used). Access credentials for the ' diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_step_main.py b/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_step_main.py index 5a0cd236f8b99..9581a5a36b696 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_step_main.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_step_main.py @@ -26,15 +26,18 @@ os.environ['DATABRICKS_TOKEN'] = '' -def setup_s3_storage(scope, credentials): +def setup_s3_storage(storage): '''Obtain AWS credentials from Databricks secrets and export so both Spark and boto can use them. ''' + + scope = storage['secret_scope'] + # dbutils is globally defined in the Databricks runtime access_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable - scope=scope, key=credentials['access_key_key'] + scope=scope, key=storage['access_key_key'] ) secret_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable - scope=scope, key=credentials['secret_key_key'] + scope=scope, key=storage['secret_key_key'] ) # Spark APIs will use this. @@ -52,18 +55,18 @@ def setup_s3_storage(scope, credentials): os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key -def setup_adls2_storage(scope, credentials): +def setup_adls2_storage(storage): '''Obtain an Azure Storage Account key from Databricks secrets and export so Spark can use it. ''' # dbutils is globally defined in the Databricks runtime storage_account_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable - scope=scope, key=credentials['storage_account_key_key'] + scope=storage['secret_scope'], key=storage['storage_account_key_key'] ) # Spark APIs will use this. # See https://docs.microsoft.com/en-gb/azure/databricks/data/data-sources/azure/azure-datalake-gen2#--access-directly-using-the-storage-account-access-key # sc is globally defined in the Databricks runtime and points to the Spark context sc._jsc.hadoopConfiguration().set( # noqa # pylint: disable=undefined-variable,protected-access - 'fs.azure.account.key.{}.dfs.core.windows.net'.format(credentials['storage_account_name']), + 'fs.azure.account.key.{}.dfs.core.windows.net'.format(storage['storage_account_name']), storage_account_key, ) @@ -83,21 +86,18 @@ def setup_storage(step_run_ref): storage = step_run_ref.environment_dict['resources']['pyspark_step_launcher']['config'][ 'storage' ] - scope = storage['secret_scope'] - credentials = storage['credentials'] - check.invariant( - len(credentials) == 1, 'No valid storage credentials found in pyspark_step_launcher config' + len(storage) == 1, 'No valid storage credentials found in pyspark_step_launcher config' ) check.invariant( - list(credentials)[0] == list(root_storage)[0], + list(storage)[0] == list(root_storage)[0], "Storage credentials in step launcher config don't match root storage", ) - if 's3' in credentials: - setup_s3_storage(scope, credentials['s3']) - elif 'adls2' in credentials: - setup_adls2_storage(scope, credentials['adls2']) + if 's3' in storage: + setup_s3_storage(storage['s3']) + elif 'adls2' in storage: + setup_adls2_storage(storage['adls2']) else: raise Exception('No valid storage found!') diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pyspark.py b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pyspark.py index 8acb10bfa3048..47a40f82d51de 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pyspark.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pyspark.py @@ -51,10 +51,11 @@ ], }, 'storage': { - 'secret_scope': 'dagster-databricks-tests', - 'credentials': { - 's3': {'access_key_key': 'aws-access-key', 'secret_key_key': 'aws-secret-key'} - }, + 's3': { + 'secret_scope': 'dagster-databricks-tests', + 'access_key_key': 'aws-access-key', + 'secret_key_key': 'aws-secret-key', + } }, } @@ -204,13 +205,11 @@ def test_do_it_live_databricks_s3(): def test_do_it_live_databricks_adls2(): config = BASE_DATABRICKS_PYSPARK_STEP_LAUNCHER_CONFIG.copy() config['storage'] = { - 'secret_scope': 'dagster-databricks-tests', - 'credentials': { - 'adls2': { - 'storage_account_name': ADLS2_STORAGE_ACCOUNT, - 'storage_account_key_key': 'adls2-storage-key', - } - }, + 'adls2': { + 'secret_scope': 'dagster-databricks-tests', + 'storage_account_name': ADLS2_STORAGE_ACCOUNT, + 'storage_account_key_key': 'adls2-storage-key', + } } result = execute_pipeline(