From fe550db32b9059f2b5b7ef4c973963de1a019ff5 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Fri, 14 Aug 2020 00:46:13 +0300 Subject: [PATCH] add file_type 'other' --- .../libraries/dagster-pyspark/dagster_pyspark/types.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python_modules/libraries/dagster-pyspark/dagster_pyspark/types.py b/python_modules/libraries/dagster-pyspark/dagster_pyspark/types.py index c954b162331a9..14cf50815b7bd 100644 --- a/python_modules/libraries/dagster-pyspark/dagster_pyspark/types.py +++ b/python_modules/libraries/dagster-pyspark/dagster_pyspark/types.py @@ -317,6 +317,7 @@ def dict_without_keys(ddict, *keys): ), } ), + 'other': Permissive(), } ) ) @@ -344,6 +345,8 @@ def dataframe_materializer(_context, config, spark_df): elif file_type == 'text': spark_df.write.text(**file_options) return AssetMaterialization.file(file_options['path']) + elif file_type == 'other': + return spark_df.write.save(**file_options) else: raise DagsterInvariantViolationError('Unsupported file_type {}'.format(file_type)) @@ -920,6 +923,7 @@ def dataframe_materializer(_context, config, spark_df): ), } ), + 'other': Permissive(), }, ), required_resource_keys={'pyspark'}, @@ -943,6 +947,8 @@ def dataframe_loader(_context, config): return spark_read.table(**file_options) elif file_type == 'text': return spark_read.text(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'other': + return spark_read.load(**file_options) else: raise DagsterInvariantViolationError( 'Unsupported file_type {file_type}'.format(file_type=file_type)