Skip to content

Commit

Permalink
Move 'secret_scope' field into inner credentials object to simplify D…
Browse files Browse the repository at this point in the history
…atabricks storage
  • Loading branch information
sd2k committed May 21, 2020
1 parent 885d8b1 commit 8f33cb0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ resources:
local_pipeline_package_path: .
staging_prefix: /dagster-databricks-tests
storage:
credentials:
s3:
access_key_key: aws-access-key
secret_key_key: aws-secret-key
secret_scope: dagster-databricks-tests
s3:
secret_scope: dagster-databricks-tests
access_key_key: aws-access-key
secret_key_key: aws-secret-key
solids:
make_weather_samples:
inputs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def define_databricks_submit_run_config():
)


def _define_secret_scope():
return Field(
String,
description='The Databricks secret scope containing the storage secrets.',
is_required=True,
)


def _define_s3_storage_credentials():
access_key_key = Field(
String,
Expand All @@ -543,7 +551,13 @@ def _define_s3_storage_credentials():
is_required=True,
)
return Field(
Shape(fields={'access_key_key': access_key_key, 'secret_key_key': secret_key_key}),
Shape(
fields={
'secret_scope': _define_secret_scope(),
'access_key_key': access_key_key,
'secret_key_key': secret_key_key,
}
),
description='S3 storage secret configuration',
)

Expand All @@ -562,6 +576,7 @@ def _define_adls2_storage_credentials():
return Field(
Shape(
fields={
'secret_scope': _define_secret_scope(),
'storage_account_name': storage_account_name,
'storage_account_key_key': storage_account_key_key,
}
Expand All @@ -579,13 +594,14 @@ def _define_storage_credentials():


def define_databricks_storage_config():
secret_scope = Field(
String,
description='The Databricks secret scope containing the storage secrets.',
is_required=True,
)
return Field(
Shape(fields={'secret_scope': secret_scope, 'credentials': _define_storage_credentials()}),
Selector(
{
's3': _define_s3_storage_credentials()
# TODO: uncomment when dagster-azure is merged.
# 'adls2': _define_adls2_storage_credentials(),
}
),
description='Databricks storage configuration. Solids using the '
'DatabricksPySparkStepLauncher to execute pipeline steps in Databricks MUST configure '
'storage using this config (either S3 or ADLS2 can be used). Access credentials for the '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
os.environ['DATABRICKS_TOKEN'] = ''


def setup_s3_storage(scope, credentials):
def setup_s3_storage(storage):
'''Obtain AWS credentials from Databricks secrets and export so both Spark and boto can use them.
'''

scope = storage['secret_scope']

# dbutils is globally defined in the Databricks runtime
access_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable
scope=scope, key=credentials['access_key_key']
scope=scope, key=storage['access_key_key']
)
secret_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable
scope=scope, key=credentials['secret_key_key']
scope=scope, key=storage['secret_key_key']
)

# Spark APIs will use this.
Expand All @@ -52,18 +55,18 @@ def setup_s3_storage(scope, credentials):
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key


def setup_adls2_storage(scope, credentials):
def setup_adls2_storage(storage):
'''Obtain an Azure Storage Account key from Databricks secrets and export so Spark can use it.
'''
# dbutils is globally defined in the Databricks runtime
storage_account_key = dbutils.secrets.get( # noqa # pylint: disable=undefined-variable
scope=scope, key=credentials['storage_account_key_key']
scope=storage['secret_scope'], key=storage['storage_account_key_key']
)
# Spark APIs will use this.
# See https://docs.microsoft.com/en-gb/azure/databricks/data/data-sources/azure/azure-datalake-gen2#--access-directly-using-the-storage-account-access-key
# sc is globally defined in the Databricks runtime and points to the Spark context
sc._jsc.hadoopConfiguration().set( # noqa # pylint: disable=undefined-variable,protected-access
'fs.azure.account.key.{}.dfs.core.windows.net'.format(credentials['storage_account_name']),
'fs.azure.account.key.{}.dfs.core.windows.net'.format(storage['storage_account_name']),
storage_account_key,
)

Expand All @@ -83,21 +86,18 @@ def setup_storage(step_run_ref):
storage = step_run_ref.environment_dict['resources']['pyspark_step_launcher']['config'][
'storage'
]
scope = storage['secret_scope']
credentials = storage['credentials']

check.invariant(
len(credentials) == 1, 'No valid storage credentials found in pyspark_step_launcher config'
len(storage) == 1, 'No valid storage credentials found in pyspark_step_launcher config'
)
check.invariant(
list(credentials)[0] == list(root_storage)[0],
list(storage)[0] == list(root_storage)[0],
"Storage credentials in step launcher config don't match root storage",
)

if 's3' in credentials:
setup_s3_storage(scope, credentials['s3'])
elif 'adls2' in credentials:
setup_adls2_storage(scope, credentials['adls2'])
if 's3' in storage:
setup_s3_storage(storage['s3'])
elif 'adls2' in storage:
setup_adls2_storage(storage['adls2'])
else:
raise Exception('No valid storage found!')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@
],
},
'storage': {
'secret_scope': 'dagster-databricks-tests',
'credentials': {
's3': {'access_key_key': 'aws-access-key', 'secret_key_key': 'aws-secret-key'}
},
's3': {
'secret_scope': 'dagster-databricks-tests',
'access_key_key': 'aws-access-key',
'secret_key_key': 'aws-secret-key',
}
},
}

Expand Down Expand Up @@ -204,13 +205,11 @@ def test_do_it_live_databricks_s3():
def test_do_it_live_databricks_adls2():
config = BASE_DATABRICKS_PYSPARK_STEP_LAUNCHER_CONFIG.copy()
config['storage'] = {
'secret_scope': 'dagster-databricks-tests',
'credentials': {
'adls2': {
'storage_account_name': ADLS2_STORAGE_ACCOUNT,
'storage_account_key_key': 'adls2-storage-key',
}
},
'adls2': {
'secret_scope': 'dagster-databricks-tests',
'storage_account_name': ADLS2_STORAGE_ACCOUNT,
'storage_account_key_key': 'adls2-storage-key',
}
}

result = execute_pipeline(
Expand Down

0 comments on commit 8f33cb0

Please sign in to comment.