diff --git a/python_modules/libraries/dagster-azure/.coveragerc b/python_modules/libraries/dagster-azure/.coveragerc new file mode 100644 index 0000000000000..398ff08afa472 --- /dev/null +++ b/python_modules/libraries/dagster-azure/.coveragerc @@ -0,0 +1,2 @@ +[run] +branch = True diff --git a/python_modules/libraries/dagster-azure/LICENSE b/python_modules/libraries/dagster-azure/LICENSE new file mode 100644 index 0000000000000..8dada3edaf50d --- /dev/null +++ b/python_modules/libraries/dagster-azure/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/python_modules/libraries/dagster-azure/MANIFEST.in b/python_modules/libraries/dagster-azure/MANIFEST.in new file mode 100644 index 0000000000000..480c841039671 --- /dev/null +++ b/python_modules/libraries/dagster-azure/MANIFEST.in @@ -0,0 +1,5 @@ +recursive-include dagster_azure *.sh +recursive-include dagster_azure *.yaml +recursive-include dagster_azure *.txt +recursive-include dagster_azure *.template +include LICENSE diff --git a/python_modules/libraries/dagster-azure/dagster_azure/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure/__init__.py new file mode 100644 index 0000000000000..41b391e118771 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/__init__.py @@ -0,0 +1,5 @@ +from dagster.core.utils import check_dagster_package_version + +from .version import __version__ + +check_dagster_package_version('dagster-azure', __version__) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py new file mode 100644 index 0000000000000..c2246756a6250 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py @@ -0,0 +1,11 @@ +from .adls2_fake_resource import ADLS2FakeClient, create_adls2_fake_resource +from .file_cache import ADLS2FileCache, adls2_file_cache +from .file_manager import ADLS2FileHandle, ADLS2FileManager +from .intermediate_store import ADLS2IntermediateStore +from .object_store import ADLS2ObjectStore +from .resources import adls2_resource +from .system_storage import adls2_plus_default_storage_defs, adls2_system_storage +from .utils import create_adls2_client + +# from .compute_log_manager import ADLS2ComputeLogManager +# from .solids import ADLS2Coordinate, file_handle_to_adls2 diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/adls2_fake_resource.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/adls2_fake_resource.py new file mode 100644 index 0000000000000..d2c4a85682c6a --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/adls2_fake_resource.py @@ -0,0 +1,143 @@ +""" +TODO sd2k: implement. +""" +from collections import defaultdict +from contextlib import contextmanager +import io +import random + +from azure.core.exceptions import ResourceNotFoundError + +from dagster.seven import mock + + +def create_adls2_fake_resource(account_name): + '''Create a mock ADLS2 session for test.''' + return ADLS2FakeClient(account_name, 'fake-creds') + + +class ADLS2FakeClient(object): + '''Stateful mock of an ADLS2 client for testing. + + Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict. + ''' + + def __init__(self, account_name, credential): + + self._account_name = account_name + self._credential = mock.MagicMock() + self._credential.account_key = credential + self._file_systems = {} + + @property + def account_name(self): + return self._account_name + + @property + def credential(self): + return self._credential + + @property + def file_systems(self): + return self._file_systems + + def get_file_system_client(self, file_system): + return self._file_systems.setdefault( + file_system, ADLS2FakeFilesystemClient(self.account_name, file_system) + ) + + def get_file_client(self, file_system, file_path): + return self.get_file_system_client(file_system).get_file_client(file_path) + + +class ADLS2FakeFilesystemClient(object): + '''Stateful mock of an ADLS2 filesystem client for testing.''' + + def __init__(self, account_name, file_system_name): + self._file_system = defaultdict(ADLS2FakeFileClient) + self._account_name = account_name + self._file_system_name = file_system_name + + @property + def account_name(self): + return self._account_name + + @property + def file_system_name(self): + return self._file_system_name + + def keys(self): + return self._file_system.keys() + + def get_file_system_properties(self): + return {"account_name": self.account_name, "file_system_name": self.file_system_name} + + def has_file(self, path): + return bool(self._file_system.get(path)) + + def get_file_client(self, file_path): + return self._file_system[file_path] + + def create_file(self, file): + return self._file_system[file] + + def delete_file(self, file): + for k in list(self._file_system.keys()): + if k.startswith(file): + del self._file_system[k] + + +class ADLS2FakeFileClient(object): + '''Stateful mock of an ADLS2 file client for testing.''' + + def __init__(self): + self.contents = None + self.lease = None + + def get_file_properties(self): + if self.contents is None: + raise ResourceNotFoundError("File does not exist!") + return {"lease": self.lease} + + def upload_data(self, contents, overwrite=False, lease=None): + if self.lease is not None: + if lease != self.lease: + raise Exception("Invalid lease!") + if self.contents is not None or overwrite is True: + if isinstance(contents, str): + self.contents = contents.encode('utf8') + elif isinstance(contents, io.BytesIO): + self.contents = contents.read() + elif isinstance(contents, io.StringIO): + self.contents = contents.read().encode('utf8') + elif isinstance(contents, bytes): + self.contents = contents + else: + print("Uploading unknown data") + self.contents = contents + + @contextmanager + def acquire_lease(self, lease_duration=-1): # pylint: disable=unused-argument + if self.lease is None: + self.lease = random.randint(0, 2 ** 9) + try: + yield self.lease + finally: + self.lease = None + else: + raise Exception("Lease already held") + + def download_file(self): + if self.contents is None: + raise ResourceNotFoundError("File does not exist!") + return ADLS2FakeFileDownloader(contents=self.contents) + + +class ADLS2FakeFileDownloader(object): + '''Mock of an ADLS2 file downloader for testing.''' + + def __init__(self, contents): + self.contents = contents + + def readall(self): + return self.contents diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_cache.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_cache.py new file mode 100644 index 0000000000000..6468039dd86cc --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_cache.py @@ -0,0 +1,74 @@ +from azure.core.exceptions import ResourceNotFoundError + +from dagster import Field, String, StringSource, Selector, check, resource +from dagster.core.storage.file_cache import FileCache + +from .file_manager import ADLS2FileHandle +from .utils import create_adls2_client + + +class ADLS2FileCache(FileCache): + def __init__( + self, storage_account, file_system, prefix, credential=None, overwrite=False, client=None + ): + super(ADLS2FileCache, self).__init__(overwrite=overwrite) + + self.storage_account = storage_account + self.file_system = file_system + self.prefix = prefix + + self.client = client or create_adls2_client(storage_account, credential) + + def has_file_object(self, file_key): + check.str_param(file_key, 'file_key') + try: + file = self.client.get_file_client(self.file_system, self.get_full_key(file_key)) + file.get_file_properties() + except ResourceNotFoundError: + return False + return True + + def get_full_key(self, file_key): + return '{base_key}/{file_key}'.format(base_key=self.prefix, file_key=file_key) + + def write_file_object(self, file_key, source_file_object): + check.str_param(file_key, 'file_key') + + adls2_key = self.get_full_key(file_key) + adls2_file = self.client.get_file_client(file_system=self.file_system, file_path=adls2_key) + adls2_file.upload_data(source_file_object, overwrite=True) + return self.get_file_handle(file_key) + + def get_file_handle(self, file_key): + check.str_param(file_key, 'file_key') + return ADLS2FileHandle( + self.client.account_name, self.file_system, self.get_full_key(file_key) + ) + + +@resource( + { + 'storage_account': Field(String, description='The storage account name.'), + 'credential': Field( + Selector( + { + 'sas': Field(StringSource, description='SAS token for the account.'), + 'key': Field(StringSource, description='Shared Access Key for the account'), + } + ), + description='The credentials with which to authenticate.', + ), + 'prefix': Field(String, description='The base path prefix to use in ADLS2'), + 'file_system': Field(String, description='The storage account filesystem (aka container)'), + 'overwrite': Field(bool, is_required=False, default_value=False), + } +) +def adls2_file_cache(init_context): + return ADLS2FileCache( + storage_account=init_context.resource_config['storage_account'], + file_system=init_context.resource_config['file_system'], + prefix=init_context.resource_config['prefix'], + credential=init_context.resource_config['credential'], + overwrite=init_context.resource_config['overwrite'], + # TODO: resource dependencies + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_manager.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_manager.py new file mode 100644 index 0000000000000..a63bbeb5b0655 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/file_manager.py @@ -0,0 +1,109 @@ +import io +import uuid +from contextlib import contextmanager + +from dagster import check, usable_as_dagster_type +from dagster.core.storage.file_manager import ( + FileHandle, + FileManager, + TempfileManager, + check_file_like_obj, +) + + +@usable_as_dagster_type +class ADLS2FileHandle(FileHandle): + def __init__(self, account, file_system, key): + self._account = check.str_param(account, 'account') + self._file_system = check.str_param(file_system, 'file_system') + self._key = check.str_param(key, 'key') + + @property + def account(self): + return self._account + + @property + def file_system(self): + return self._file_system + + @property + def key(self): + return self._key + + @property + def path_desc(self): + return self.adls2_path + + @property + def adls2_path(self): + return 'adfss://{file_system}@{account}.dfs.core.windows.net/{key}'.format( + file_system=self.file_system, account=self.account, key=self.key, + ) + + +class ADLS2FileManager(FileManager): + def __init__(self, adls2_client, file_system, prefix): + self._client = adls2_client + self._file_system = check.str_param(file_system, 'file_system') + self._prefix = check.str_param(prefix, 'prefix') + self._local_handle_cache = {} + self._temp_file_manager = TempfileManager() + + def copy_handle_to_local_temp(self, file_handle): + self._download_if_not_cached(file_handle) + return self._get_local_path(file_handle) + + def _download_if_not_cached(self, file_handle): + if not self._file_handle_cached(file_handle): + # instigate download + temp_file_obj = self._temp_file_manager.tempfile() + temp_name = temp_file_obj.name + file = self._client.get_file_client( + file_system=file_handle.file_system, file_path=file_handle.key, + ) + download = file.download_file() + with open(temp_name, 'wb') as file_obj: + download.readinto(file_obj) + self._local_handle_cache[file_handle.adls2_path] = temp_name + + return file_handle + + @contextmanager + def read(self, file_handle, mode='rb'): + check.inst_param(file_handle, 'file_handle', ADLS2FileHandle) + check.str_param(mode, 'mode') + check.param_invariant(mode in {'r', 'rb'}, 'mode') + + self._download_if_not_cached(file_handle) + + with open(self._get_local_path(file_handle), mode) as file_obj: + yield file_obj + + def _file_handle_cached(self, file_handle): + return file_handle.adls2_path in self._local_handle_cache + + def _get_local_path(self, file_handle): + return self._local_handle_cache[file_handle.adls2_path] + + def read_data(self, file_handle): + with self.read(file_handle, mode='rb') as file_obj: + return file_obj.read() + + def write_data(self, data, ext=None): + check.inst_param(data, 'data', bytes) + return self.write(io.BytesIO(data), mode='wb', ext=ext) + + def write(self, file_obj, mode='wb', ext=None): # pylint: disable=unused-argument + check_file_like_obj(file_obj) + adls2_key = self.get_full_key(str(uuid.uuid4()) + (('.' + ext) if ext is not None else '')) + adls2_file = self._client.get_file_client( + file_system=self._file_system, file_path=adls2_key + ) + adls2_file.upload_data(file_obj, overwrite=True) + return ADLS2FileHandle(self._client.account_name, self._file_system, adls2_key) + + def get_full_key(self, file_key): + return '{base_key}/{file_key}'.format(base_key=self._prefix, file_key=file_key) + + def delete_local_temp(self): + self._temp_file_manager.close() diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/intermediate_store.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/intermediate_store.py new file mode 100644 index 0000000000000..29862d33d9af1 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/intermediate_store.py @@ -0,0 +1,51 @@ +from dagster import check +from dagster.core.storage.intermediate_store import IntermediateStore +from dagster.core.storage.type_storage import TypeStoragePluginRegistry +from dagster.seven import urlparse + +from .object_store import ADLS2ObjectStore + + +class ADLS2IntermediateStore(IntermediateStore): + '''Intermediate store using Azure Data Lake Storage Gen2. + + This intermediate store uses ADLS2 APIs to communicate with the storage, + which are better optimised for various tasks than regular Blob storage. + + If your storage account does not have the ADLS Gen2 hierarchical namespace enabled + this will not work: use the + :py:class:`~dagster_azure.blob.intermediate_store.AzureBlobIntermediateStore` + instead. + ''' + + def __init__( + self, file_system, run_id, client, type_storage_plugin_registry=None, prefix='dagster', + ): + check.str_param(file_system, 'file_system') + check.str_param(prefix, 'prefix') + check.str_param(run_id, 'run_id') + + object_store = ADLS2ObjectStore( + client.account_name, + # client.credential is non-null if a secret key was used to authenticate + client.credential.account_key if client.credential is not None + # otherwise the SAS token will be in the query string of the URL + else urlparse(client.url).query, + file_system, + ) + + def root_for_run_id(r_id): + return object_store.key_for_paths([prefix, 'storage', r_id]) + + super(ADLS2IntermediateStore, self).__init__( + object_store, + root_for_run_id=root_for_run_id, + run_id=run_id, + type_storage_plugin_registry=check.inst_param( + type_storage_plugin_registry + if type_storage_plugin_registry + else TypeStoragePluginRegistry(types_to_register=[]), + 'type_storage_plugin_registry', + TypeStoragePluginRegistry, + ), + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/object_store.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/object_store.py new file mode 100644 index 0000000000000..7ff8fa7c872c0 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/object_store.py @@ -0,0 +1,150 @@ +import logging +import re +import sys +from io import BytesIO, StringIO + +from azure.core.exceptions import ResourceNotFoundError + +from dagster import check +from dagster.core.definitions.events import ObjectStoreOperation, ObjectStoreOperationType +from dagster.core.storage.object_store import ObjectStore +from dagster.core.types.marshal import SerializationStrategy + +from .utils import create_adls2_client, create_blob_client + +DEFAULT_LEASE_DURATION = 60 * 60 # One hour + + +class ADLS2ObjectStore(ObjectStore): + def __init__( + self, storage_account, credential, file_system, lease_duration=DEFAULT_LEASE_DURATION + ): + self.adls2_client = create_adls2_client(storage_account, credential) + self.file_system_client = self.adls2_client.get_file_system_client(file_system) + # We also need a blob client to handle copying as ADLS doesn't have a copy API yet + self.blob_client = create_blob_client(storage_account, credential) + self.blob_container_client = self.blob_client.get_container_client(file_system) + + self.lease_duration = lease_duration + self.file_system_client.get_file_system_properties() + super(ADLS2ObjectStore, self).__init__('adls2', sep='/') + + def set_object(self, key, obj, serialization_strategy=None): + check.str_param(key, 'key') + + logging.info('Writing ADLS2 object at: ' + self.uri_for_key(key)) + + # cannot check obj since could be arbitrary Python object + check.inst_param( + serialization_strategy, 'serialization_strategy', SerializationStrategy + ) # cannot be none here + + if self.has_object(key): + logging.warning('Removing existing ADLS2 key: {key}'.format(key=key)) + self.rm_object(key) + + file = self.file_system_client.create_file(key) + with file.acquire_lease(self.lease_duration) as lease: + with BytesIO() as bytes_io: + if serialization_strategy.write_mode == 'w' and sys.version_info >= (3, 0): + with StringIO() as string_io: + string_io = StringIO() + serialization_strategy.serialize(obj, string_io) + string_io.seek(0) + bytes_io.write(string_io.read().encode('utf-8')) + else: + serialization_strategy.serialize(obj, bytes_io) + bytes_io.seek(0) + file.upload_data(bytes_io, lease=lease, overwrite=True) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.SET_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=obj, + serialization_strategy_name=serialization_strategy.name, + object_store_name=self.name, + ) + + def get_object(self, key, serialization_strategy=None): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + check.inst_param( + serialization_strategy, 'serialization_strategy', SerializationStrategy + ) # cannot be none here + + # FIXME we need better error handling for object store + file = self.file_system_client.get_file_client(key) + stream = file.download_file() + obj = serialization_strategy.deserialize( + BytesIO(stream.readall()) + if serialization_strategy.read_mode == 'rb' + else StringIO(stream.readall().decode(serialization_strategy.encoding)) + ) + return ObjectStoreOperation( + op=ObjectStoreOperationType.GET_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=obj, + serialization_strategy_name=serialization_strategy.name, + object_store_name=self.name, + ) + + def has_object(self, key): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + + try: + file = self.file_system_client.get_file_client(key) + file.get_file_properties() + return True + except ResourceNotFoundError: + return False + + def rm_object(self, key): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + + # This operates recursively already so is nice and simple. + self.file_system_client.delete_file(key) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.RM_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=None, + serialization_strategy_name=None, + object_store_name=self.name, + ) + + def cp_object(self, src, dst): + check.str_param(src, 'src') + check.str_param(dst, 'dst') + + # Manually recurse and copy anything that looks like a file. + for src_blob_properties in self.blob_container_client.list_blobs(src): + # This is the only way I can find to identify a 'directory' + if src_blob_properties["content_settings"] is None: + # Ignore this blob + continue + src_blob = self.blob_container_client.get_blob_client(src_blob_properties["name"]) + new_blob_path = re.sub(r'^{}'.format(src), dst, src_blob_properties["name"]) + new_blob = self.blob_container_client.get_blob_client(new_blob_path) + new_blob.start_copy_from_url(src_blob.url) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.CP_OBJECT, + key=self.uri_for_key(src), + dest_key=self.uri_for_key(dst), + object_store_name=self.name, + ) + + def uri_for_key(self, key, protocol=None): + check.str_param(key, 'key') + protocol = check.opt_str_param(protocol, 'protocol', default='abfss://') + return '{protocol}{filesystem}@{account}.dfs.core.windows.net/{key}'.format( + protocol=protocol, + filesystem=self.file_system_client.file_system_name, + account=self.file_system_client.account_name, + key=key, + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/resources.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/resources.py new file mode 100644 index 0000000000000..7a51a4b4adb6d --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/resources.py @@ -0,0 +1,76 @@ +from dagster import Field, Selector, String, StringSource, resource + +from .utils import create_adls2_client + + +@resource( + { + 'storage_account': Field(String, description='The storage account name.'), + 'credential': Field( + Selector( + { + 'sas': Field(StringSource, description='SAS token for the account.'), + 'key': Field(StringSource, description='Shared Access Key for the account'), + } + ), + description='The credentials with which to authenticate.', + ), + } +) +def adls2_resource(context): + '''Resource that gives solids access to Azure Data Lake Storage Gen2. + + The underlying client is a :py:class:`~azure.storage.filedatalake.DataLakeServiceClient`. + + Attach this resource definition to a :py:class:`~dagster.ModeDefinition` in order to make it + available to your solids. + + Example: + + .. code-block:: python + + from dagster import ModeDefinition, execute_solid, solid + from dagster_azure.adls2 import adls2_resource + + @solid(required_resource_keys={'adls2'}) + def example_adls2_solid(context): + return list(context.resources.adls2.list_file_systems()) + + result = execute_solid( + example_adls2_solid, + environment_dict={ + 'resources': { + 'adls2': { + 'config': { + 'storage_account': 'my_storage_account' + } + } + } + }, + mode_def=ModeDefinition(resource_defs={'adls2': adls2_resource}), + ) + + Note that your solids must also declare that they require this resource with + `required_resource_keys`, or it will not be initialized for the execution of their compute + functions. + + You may pass credentials to this resource using either a SAS token or a key, using + environment variables if desired: + + .. code-block:: YAML + + resources: + adls2: + config: + storage_account: my_storage_account + # str: The storage account name. + credential: + sas: my_sas_token + # str: the SAS token for the account. + key: + env: AZURE_DATA_LAKE_STORAGE_KEY + # str: The shared access key for the account. + ''' + storage_account = context.resource_config['storage_account'] + credential = context.resource_config["credential"].copy().popitem()[1] + return create_adls2_client(storage_account, credential) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/system_storage.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/system_storage.py new file mode 100644 index 0000000000000..edde88d383c7f --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/system_storage.py @@ -0,0 +1,75 @@ +from dagster import Field, String, SystemStorageData, system_storage +from dagster.core.storage.intermediates_manager import IntermediateStoreIntermediatesManager +from dagster.core.storage.system_storage import fs_system_storage, mem_system_storage + +from .file_manager import ADLS2FileManager +from .intermediate_store import ADLS2IntermediateStore + + +@system_storage( + name='adls2', + is_persistent=True, + config={ + 'adls2_file_system': Field(String, description='ADLS Gen2 file system name'), + 'adls2_prefix': Field(String, is_required=False, default_value='dagster'), + }, + required_resource_keys={'adls2'}, +) +def adls2_system_storage(init_context): + '''Persistent system storage using Azure Data Lake Storage Gen2 for storage. + + Suitable for intermediates storage for distributed executors, so long as + each execution node has network connectivity and credentials for ADLS and + the backing container. + + Attach this system storage definition, as well as the :py:data:`~dagster_azure.adls2_resource` + it requires, to a :py:class:`~dagster.ModeDefinition` in order to make it available to your + pipeline: + + .. code-block:: python + + pipeline_def = PipelineDefinition( + mode_defs=[ + ModeDefinition( + resource_defs={'adls2': adls2_resource, ...}, + system_storage_defs=default_system_storage_defs + [adls2_system_storage, ...], + ... + ), ... + ], ... + ) + + You may configure this storage as follows: + + .. code-block:: YAML + + storage: + adls2: + config: + adls2_sa: my-best-storage-account + adls2_file_system: my-cool-file-system + adls2_prefix: good/prefix-for-files- + ''' + client = init_context.resources.adls2 + adls2_base = '{prefix}/storage/{run_id}/files'.format( + prefix=init_context.system_storage_config['adls2_prefix'], + run_id=init_context.pipeline_run.run_id, + ) + return SystemStorageData( + file_manager=ADLS2FileManager( + adls2_client=client, + file_system=init_context.system_storage_config['adls2_file_system'], + prefix=adls2_base, + ), + intermediates_manager=IntermediateStoreIntermediatesManager( + ADLS2IntermediateStore( + client=client, + file_system=init_context.system_storage_config['adls2_file_system'], + prefix=init_context.system_storage_config['adls2_prefix'], + run_id=init_context.pipeline_run.run_id, + type_storage_plugin_registry=init_context.type_storage_plugin_registry, + ) + ), + ) + + +adls2_plus_default_storage_defs = [mem_system_storage, fs_system_storage, adls2_system_storage] diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/utils.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/utils.py new file mode 100644 index 0000000000000..7f2aa1b3e315f --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/utils.py @@ -0,0 +1,24 @@ +from azure.storage.blob import BlobServiceClient +from azure.storage.filedatalake import DataLakeServiceClient + + +def _create_url(storage_account, subdomain): + return "https://{}.{}.core.windows.net/".format(storage_account, subdomain) + + +def create_adls2_client(storage_account, credential): + """ + Create an ADLS2 client. + """ + account_url = _create_url(storage_account, "dfs") + return DataLakeServiceClient(account_url, credential) + + +def create_blob_client(storage_account, credential): + """ + Create a Blob Storage client. + """ + account_url = _create_url(storage_account, "blob") + if hasattr(credential, "account_key"): + credential = credential.account_key + return BlobServiceClient(account_url, credential) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py new file mode 100644 index 0000000000000..5d66f0fba5c4b --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py @@ -0,0 +1,199 @@ +import itertools +import os +from contextlib import contextmanager + +from azure.storage.blob import generate_blob_sas +from dagster import Field, check, seven +from dagster.core.storage.compute_log_manager import ( + MAX_BYTES_FILE_READ, + ComputeIOType, + ComputeLogFileData, + ComputeLogManager, +) +from dagster.core.storage.local_compute_log_manager import IO_TYPE_EXTENSION, LocalComputeLogManager +from dagster.serdes import ConfigurableClass, ConfigurableClassData +from dagster.utils import ensure_dir, ensure_file + +from .utils import create_blob_client + + +class AzureBlobComputeLogManager(ComputeLogManager, ConfigurableClass): + '''Logs solid compute function stdout and stderr to Azure Blob Storage. + + This is also compatible with Azure Data Lake Storage. + + Users should not instantiate this class directly. Instead, use a YAML block in ``dagster.yaml`` + such as the following: + + .. code-block:: YAML + + compute_log_manager: + module: dagster_azure.blob.compute_log_manager + class: AzureBlobComputeLogManager + config: + storage_account: my-storage-account + container: my-container + credential: sas-token-or-secret-key + prefix: "dagster-test-" + local_dir: "/tmp/cool" + + Args: + storage_account (str): The storage account name to which to log. + container (str): The container (or ADLS2 filesystem) to which to log. + secret_key (str): Secret key for the storage account. SAS tokens are not + supported because we need a secret key to generate a SAS token for a download URL. + local_dir (Optional[str]): Path to the local directory in which to stage logs. Default: + ``dagster.seven.get_system_temp_directory()``. + prefix (Optional[str]): Prefix for the log file keys. + inst_data (Optional[ConfigurableClassData]): Serializable representation of the compute + log manager when newed up from config. + ''' + + def __init__( + self, + storage_account, + container, + secret_key, + local_dir=None, + inst_data=None, + prefix='dagster', + ): + self._storage_account = check.str_param(storage_account, 'storage_account') + self._container = check.str_param(container, 'container') + self._blob_prefix = check.str_param(prefix, 'prefix') + check.str_param(secret_key, 'secret_key') + + self._blob_client = create_blob_client(storage_account, secret_key) + self._container_client = self._blob_client.get_container_client(container) + self._download_urls = {} + + # proxy calls to local compute log manager (for subscriptions, etc) + if not local_dir: + local_dir = seven.get_system_temp_directory() + + self.local_manager = LocalComputeLogManager(local_dir) + self._inst_data = check.opt_inst_param(inst_data, 'inst_data', ConfigurableClassData) + + @contextmanager + def _watch_logs(self, pipeline_run, step_key=None): + # proxy watching to the local compute log manager, interacting with the filesystem + with self.local_manager._watch_logs( # pylint: disable=protected-access + pipeline_run, step_key + ): + yield + + @property + def inst_data(self): + return self._inst_data + + @classmethod + def config_type(cls): + return { + 'storage_account': str, + 'container': str, + 'secret_key': str, + 'local_dir': Field(str, is_required=False), + 'prefix': Field(str, is_required=False, default_value='dagster'), + } + + @staticmethod + def from_config_value(inst_data, config_value): + return AzureBlobComputeLogManager(inst_data=inst_data, **config_value) + + def get_local_path(self, run_id, key, io_type): + return self.local_manager.get_local_path(run_id, key, io_type) + + def on_watch_start(self, pipeline_run, step_key): + self.local_manager.on_watch_start(pipeline_run, step_key) + + def on_watch_finish(self, pipeline_run, step_key): + self.local_manager.on_watch_finish(pipeline_run, step_key) + key = self.local_manager.get_key(pipeline_run, step_key) + self._upload_from_local(pipeline_run.run_id, key, ComputeIOType.STDOUT) + self._upload_from_local(pipeline_run.run_id, key, ComputeIOType.STDERR) + + def is_watch_completed(self, run_id, key): + return self.local_manager.is_watch_completed(run_id, key) + + def download_url(self, run_id, key, io_type): + if not self.is_watch_completed(run_id, key): + return self.local_manager.download_url(run_id, key, io_type) + key = self._blob_key(run_id, key, io_type) + if key in self._download_urls: + return self._download_urls[key] + blob = self._container_client.get_blob_client(key) + sas = generate_blob_sas( + self._storage_account, + self._container, + key, + account_key=self._blob_client.credential.account_key, + ) + url = blob.url + sas + self._download_urls[key] = url + return url + + def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): + if self._should_download(run_id, key, io_type): + self._download_to_local(run_id, key, io_type) + data = self.local_manager.read_logs_file(run_id, key, io_type, cursor, max_bytes) + return self._from_local_file_data(run_id, key, io_type, data) + + def on_subscribe(self, subscription): + self.local_manager.on_subscribe(subscription) + + def _should_download(self, run_id, key, io_type): + local_path = self.get_local_path(run_id, key, io_type) + if os.path.exists(local_path): + return False + blob_objects = self._container_client.list_blobs(self._blob_key(run_id, key, io_type)) + # Limit the generator to avoid paging since we only need one element + # to return True + limited_blob_objects = itertools.islice(blob_objects, 1) + return len(list(limited_blob_objects)) > 0 + + def _from_local_file_data(self, run_id, key, io_type, local_file_data): + is_complete = self.is_watch_completed(run_id, key) + path = ( + 'https://{account}.blob.core.windows.net/{container}/{key}'.format( + account=self._storage_account, + container=self._container, + key=self._blob_key(run_id, key, io_type), + ) + if is_complete + else local_file_data.path + ) + + return ComputeLogFileData( + path, + local_file_data.data, + local_file_data.cursor, + local_file_data.size, + self.download_url(run_id, key, io_type), + ) + + def _upload_from_local(self, run_id, key, io_type): + path = self.get_local_path(run_id, key, io_type) + ensure_file(path) + key = self._blob_key(run_id, key, io_type) + with open(path, 'rb') as data: + blob = self._container_client.get_blob_client(key) + blob.upload_blob(data) + + def _download_to_local(self, run_id, key, io_type): + path = self.get_local_path(run_id, key, io_type) + ensure_dir(os.path.dirname(path)) + with open(path, 'wb') as fileobj: + blob = self._container_client.get_blob_client(key) + blob.download_blob().readinto(fileobj) + + def _blob_key(self, run_id, key, io_type): + check.inst_param(io_type, 'io_type', ComputeIOType) + extension = IO_TYPE_EXTENSION[io_type] + paths = [ + self._blob_prefix, + 'storage', + run_id, + 'compute_logs', + '{}.{}'.format(key, extension), + ] + return '/'.join(paths) # blob path delimiter diff --git a/python_modules/libraries/dagster-azure/dagster_azure/blob/intermediate_store.py b/python_modules/libraries/dagster-azure/dagster_azure/blob/intermediate_store.py new file mode 100644 index 0000000000000..a46af60d83818 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/blob/intermediate_store.py @@ -0,0 +1,49 @@ +from dagster import check +from dagster.core.storage.intermediate_store import IntermediateStore +from dagster.core.storage.type_storage import TypeStoragePluginRegistry +from dagster.seven import urlparse + +from .object_store import AzureBlobObjectStore + + +class AzureBlobIntermediateStore(IntermediateStore): + '''Intermediate store using Azure Blob storage. + + If your storage account has the ADLS Gen2 hierarchical namespace enabled + this should still work, but it is recommended to use the + :py:class:`~dagster_azure.adls2.intermediate_store.ADLS2IntermediateStore` + instead, which will enable some optimizations for certain types (notably + PySpark DataFrames). + ''' + + def __init__( + self, container, run_id, client, type_storage_plugin_registry=None, prefix='dagster', + ): + check.str_param(container, 'container') + check.str_param(prefix, 'prefix') + check.str_param(run_id, 'run_id') + + object_store = AzureBlobObjectStore( + client.account_name, + # client.credential is non-null if a secret key was used to authenticate + client.credential.account_key if client.credential is not None + # otherwise the SAS token will be in the query string of the URL + else urlparse(client.url).query, + container, + ) + + def root_for_run_id(r_id): + return object_store.key_for_paths([prefix, 'storage', r_id]) + + super(AzureBlobIntermediateStore, self).__init__( + object_store, + root_for_run_id=root_for_run_id, + run_id=run_id, + type_storage_plugin_registry=check.inst_param( + type_storage_plugin_registry + if type_storage_plugin_registry + else TypeStoragePluginRegistry(types_to_register=[]), + 'type_storage_plugin_registry', + TypeStoragePluginRegistry, + ), + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/blob/object_store.py b/python_modules/libraries/dagster-azure/dagster_azure/blob/object_store.py new file mode 100644 index 0000000000000..46d5b844c1750 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/blob/object_store.py @@ -0,0 +1,142 @@ +import logging +import re +import sys +from io import BytesIO, StringIO + +from azure.core.exceptions import ResourceNotFoundError + +from dagster import check +from dagster.core.definitions.events import ObjectStoreOperation, ObjectStoreOperationType +from dagster.core.storage.object_store import ObjectStore +from dagster.core.types.marshal import SerializationStrategy + +from .utils import create_blob_client + +DEFAULT_LEASE_DURATION = 60 * 60 # One hour + + +class AzureBlobObjectStore(ObjectStore): + def __init__( + self, storage_account, credential, container, lease_duration=DEFAULT_LEASE_DURATION + ): + self.blob_client = create_blob_client(storage_account, credential,) + self.container_client = self.blob_client.get_container_client(container) + + self.lease_duration = lease_duration + self.container_client.get_container_properties() + super(AzureBlobObjectStore, self).__init__('azure-blob', sep='/') + + def set_object(self, key, obj, serialization_strategy=None): + check.str_param(key, 'key') + + logging.info('Writing Azure Blob object at: ' + self.uri_for_key(key)) + + # cannot check obj since could be arbitrary Python object + check.inst_param( + serialization_strategy, 'serialization_strategy', SerializationStrategy + ) # cannot be none here + + blob = self.container_client.create_blob(key) + with blob.acquire_lease(self.lease_duration) as lease: + with BytesIO() as bytes_io: + if serialization_strategy.write_mode == 'w' and sys.version_info >= (3, 0): + with StringIO() as string_io: + string_io = StringIO() + serialization_strategy.serialize(obj, string_io) + string_io.seek(0) + bytes_io.write(string_io.read().encode('utf-8')) + else: + serialization_strategy.serialize(obj, bytes_io) + bytes_io.seek(0) + blob.upload_blob(bytes_io, lease=lease, overwrite=True) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.SET_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=obj, + serialization_strategy_name=serialization_strategy.name, + object_store_name=self.name, + ) + + def get_object(self, key, serialization_strategy=None): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + check.inst_param( + serialization_strategy, 'serialization_strategy', SerializationStrategy + ) # cannot be none here + + # FIXME we need better error handling for object store + blob = self.container_client.download_blob(key) + obj = serialization_strategy.deserialize( + BytesIO(blob.readall()) + if serialization_strategy.read_mode == 'rb' + else StringIO(blob.readall().decode(serialization_strategy.encoding)) + ) + return ObjectStoreOperation( + op=ObjectStoreOperationType.GET_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=obj, + serialization_strategy_name=serialization_strategy.name, + object_store_name=self.name, + ) + + def has_object(self, key): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + + try: + blob = self.container_client.get_blob_client(key) + blob.get_blob_properties() + return True + except ResourceNotFoundError: + return False + + def rm_object(self, key): + check.str_param(key, 'key') + check.param_invariant(len(key) > 0, 'key') + + for blob in self.container_client.list_blobs(key): + self.container_client.delete_blob(blob) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.RM_OBJECT, + key=self.uri_for_key(key), + dest_key=None, + obj=None, + serialization_strategy_name=None, + object_store_name=self.name, + ) + + def cp_object(self, src, dst): + check.str_param(src, 'src') + check.str_param(dst, 'dst') + + # Manually recurse and copy anything that looks like a file. + for src_blob_properties in self.container_client.list_blobs(src): + # This is the only way I can find to identify a 'directory' + if src_blob_properties['content_settings'] is None: + # Ignore this blob + continue + src_blob = self.container_client.get_blob_client(src_blob_properties['name']) + dst_blob_path = re.sub(r'^{}'.format(src), dst, src_blob_properties['name']) + dst_blob = self.container_client.get_blob_client(dst_blob_path) + dst_blob.start_copy_from_url(src_blob.url) + + return ObjectStoreOperation( + op=ObjectStoreOperationType.CP_OBJECT, + key=self.uri_for_key(src), + dest_key=self.uri_for_key(dst), + object_store_name=self.name, + ) + + def uri_for_key(self, key, protocol=None): + check.str_param(key, 'key') + protocol = check.opt_str_param(protocol, 'protocol', default='https://') + return '{protocol}@{account}.blob.core.windows.net/{container}/{key}'.format( + protocol=protocol, + account=self.blob_client.account_name, + container=self.container_client.container_name, + key=key, + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/version.py b/python_modules/libraries/dagster-azure/dagster_azure/version.py new file mode 100644 index 0000000000000..3d73cc3e7f754 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/version.py @@ -0,0 +1,3 @@ +__version__ = '0.7.12' + +__nightly__ = '2020.05.11' diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/conftest.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/conftest.py new file mode 100644 index 0000000000000..56497bf87716e --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/conftest.py @@ -0,0 +1,16 @@ +import pytest + + +@pytest.fixture(scope='session') +def storage_account(): + yield 'dagsterscratch80542c2' + + +@pytest.fixture(scope='session') +def file_system(): + yield 'dagster-scratch' + + +@pytest.fixture(scope='session') +def credential(): + yield 'super-secret-creds' diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_cache.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_cache.py new file mode 100644 index 0000000000000..1e5ef13d0c313 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_cache.py @@ -0,0 +1,60 @@ +import io + +from dagster_azure.adls2 import ADLS2FakeClient, ADLS2FileCache, ADLS2FileHandle + + +def test_adls2_file_cache_file_not_present(storage_account, file_system, credential): + fake_client = ADLS2FakeClient(storage_account, credential) + file_store = ADLS2FileCache( + storage_account=storage_account, + file_system=file_system, + prefix='some-prefix', + client=fake_client, + overwrite=False, + ) + + assert not file_store.has_file_object('foo') + + +def test_adls2_file_cache_file_present(storage_account, file_system, credential): + fake_client = ADLS2FakeClient(storage_account, credential) + file_store = ADLS2FileCache( + storage_account=storage_account, + file_system=file_system, + prefix='some-prefix', + client=fake_client, + overwrite=False, + ) + + assert not file_store.has_file_object('foo') + + file_store.write_binary_data('foo', 'bar'.encode()) + + assert file_store.has_file_object('foo') + + +def test_adls2_file_cache_correct_handle(storage_account, file_system, credential): + fake_client = ADLS2FakeClient(storage_account, credential) + file_store = ADLS2FileCache( + storage_account=storage_account, + file_system=file_system, + prefix='some-prefix', + client=fake_client, + overwrite=False, + ) + + assert isinstance(file_store.get_file_handle('foo'), ADLS2FileHandle) + + +def test_adls2_file_cache_write_file_object(storage_account, file_system, credential): + fake_client = ADLS2FakeClient(storage_account, credential) + file_store = ADLS2FileCache( + storage_account=storage_account, + file_system=file_system, + prefix='some-prefix', + client=fake_client, + overwrite=False, + ) + + stream = io.BytesIO('content'.encode()) + file_store.write_file_object('foo', stream) diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_manager.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_manager.py new file mode 100644 index 0000000000000..1ace0277242ef --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_adls2_file_manager.py @@ -0,0 +1,196 @@ +'''TODO sd2k: finish rewriting for ADLS instead of S3''' +import os +import uuid + +import pytest + +from dagster_azure.adls2 import ( + ADLS2FileHandle, + ADLS2FileManager, + create_adls2_fake_resource, + adls2_plus_default_storage_defs, +) + +from dagster import ( + InputDefinition, + Int, + ModeDefinition, + OutputDefinition, + ResourceDefinition, + execute_pipeline, + pipeline, + solid, +) +from dagster.seven import mock + +# For deps + + +def test_adls2_file_manager_write(storage_account, file_system): + file_mock = mock.MagicMock() + adls2_mock = mock.MagicMock() + adls2_mock.get_file_client.return_value = file_mock + adls2_mock.account_name = storage_account + file_manager = ADLS2FileManager(adls2_mock, file_system, 'some-key') + + foo_bytes = 'foo'.encode() + + file_handle = file_manager.write_data(foo_bytes) + + assert isinstance(file_handle, ADLS2FileHandle) + + assert file_handle.account == storage_account + assert file_handle.file_system == file_system + assert file_handle.key.startswith('some-key/') + + assert file_mock.upload_data.call_count == 1 + + file_handle = file_manager.write_data(foo_bytes, ext='foo') + + assert isinstance(file_handle, ADLS2FileHandle) + + assert file_handle.account == storage_account + assert file_handle.file_system == file_system + assert file_handle.key.startswith('some-key/') + assert file_handle.key[-4:] == '.foo' + + assert file_mock.upload_data.call_count == 2 + + +def test_adls2_file_manager_read(storage_account, file_system): + state = {'called': 0} + bar_bytes = 'bar'.encode() + + class DownloadMock(mock.MagicMock): + def readinto(self, fileobj): + fileobj.write(bar_bytes) + + class FileMock(mock.MagicMock): + def download_file(self): + state['called'] += 1 + assert state['called'] == 1 + return DownloadMock(file=self) + + class ADLS2Mock(mock.MagicMock): + def get_file_client(self, *_args, **kwargs): + state['file_system'] = kwargs['file_system'] + file_path = kwargs['file_path'] + state['file_path'] = kwargs['file_path'] + return FileMock(file_path=file_path) + + adls2_mock = ADLS2Mock() + file_manager = ADLS2FileManager(adls2_mock, file_system, 'some-key') + file_handle = ADLS2FileHandle(storage_account, file_system, 'some-key/kdjfkjdkfjkd') + with file_manager.read(file_handle) as file_obj: + assert file_obj.read() == bar_bytes + + assert state['file_system'] == file_handle.file_system + assert state['file_path'] == file_handle.key + + # read again. cached + with file_manager.read(file_handle) as file_obj: + assert file_obj.read() == bar_bytes + + file_manager.delete_local_temp() + + +@pytest.mark.skip(reason="Need to mock out blob storage") +def test_depends_on_s3_resource_intermediates(storage_account, file_system): + @solid( + input_defs=[InputDefinition('num_one', Int), InputDefinition('num_two', Int)], + output_defs=[OutputDefinition(Int)], + ) + def add_numbers(_, num_one, num_two): + return num_one + num_two + + adls2_fake_resource = create_adls2_fake_resource(storage_account) + + @pipeline( + mode_defs=[ + ModeDefinition( + system_storage_defs=adls2_plus_default_storage_defs, + resource_defs={'adls2': ResourceDefinition.hardcoded_resource(adls2_fake_resource)}, + ) + ] + ) + def adls2_internal_pipeline(): + return add_numbers() + + result = execute_pipeline( + adls2_internal_pipeline, + environment_dict={ + 'solids': { + 'add_numbers': {'inputs': {'num_one': {'value': 2}, 'num_two': {'value': 4}}} + }, + 'storage': {'adls2': {'config': {'adls2_file_system': file_system}}}, + }, + ) + + assert result.success + assert result.result_for_solid('add_numbers').output_value() == 6 + + assert storage_account in adls2_fake_resource.file_systems + + keys = set() + for step_key, output_name in [('add_numbers.compute', 'result')]: + keys.add(create_adls2_key(result.run_id, step_key, output_name)) + + assert set(adls2_fake_resource.file_systems[storage_account].keys()) == keys + + +def create_adls2_key(run_id, step_key, output_name): + return 'dagster/storage/{run_id}/intermediates/{step_key}/{output_name}'.format( + run_id=run_id, step_key=step_key, output_name=output_name + ) + + +@pytest.mark.skip(reason="Need to mock out blob storage") +def test_depends_on_s3_resource_file_manager(storage_account, file_system): + bar_bytes = 'bar'.encode() + + @solid(output_defs=[OutputDefinition(ADLS2FileHandle)]) + def emit_file(context): + return context.file_manager.write_data(bar_bytes) + + @solid(input_defs=[InputDefinition('file_handle', ADLS2FileHandle)]) + def accept_file(context, file_handle): + local_path = context.file_manager.copy_handle_to_local_temp(file_handle) + assert isinstance(local_path, str) + assert open(local_path, 'rb').read() == bar_bytes + + adls2_fake_resource = create_adls2_fake_resource(storage_account) + + @pipeline( + mode_defs=[ + ModeDefinition( + system_storage_defs=adls2_plus_default_storage_defs, + resource_defs={'adls2': ResourceDefinition.hardcoded_resource(adls2_fake_resource)}, + ) + ] + ) + def s3_file_manager_test(): + accept_file(emit_file()) + + result = execute_pipeline( + s3_file_manager_test, + environment_dict={'storage': {'adls2': {'config': {'adls2_file_system': file_system}}}}, + ) + + assert result.success + + keys_in_bucket = set(adls2_fake_resource.file_systems[storage_account].keys()) + + for step_key, output_name in [ + ('emit_file.compute', 'result'), + ('accept_file.compute', 'result'), + ]: + keys_in_bucket.remove(create_adls2_key(result.run_id, step_key, output_name)) + + assert len(keys_in_bucket) == 1 + + file_key = list(keys_in_bucket)[0] + comps = file_key.split('/') + + assert '/'.join(comps[:-1]) == 'dagster/storage/{run_id}/files'.format(run_id=result.run_id) + + assert uuid.UUID(comps[-1]) diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_intermediate_store.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_intermediate_store.py new file mode 100644 index 0000000000000..d42965095e9f6 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_intermediate_store.py @@ -0,0 +1,410 @@ +'''TODO sd2k: rewrite this for ADLS2 instead of S3''' +import csv +import os +from collections import OrderedDict + +import pytest +from dagster_aws.s3 import S3IntermediateStore, s3_plus_default_storage_defs, s3_resource + +from dagster import ( + Bool, + InputDefinition, + Int, + List, + ModeDefinition, + OutputDefinition, + PipelineRun, + SerializationStrategy, + String, + check, + execute_pipeline, + lambda_solid, + pipeline, + usable_as_dagster_type, +) +from dagster.core.events import DagsterEventType +from dagster.core.execution.api import create_execution_plan, execute_plan, scoped_pipeline_context +from dagster.core.execution.plan.objects import StepOutputHandle +from dagster.core.instance import DagsterInstance +from dagster.core.storage.intermediates_manager import IntermediateStoreIntermediatesManager +from dagster.core.storage.type_storage import TypeStoragePlugin, TypeStoragePluginRegistry +from dagster.core.types.dagster_type import Bool as RuntimeBool +from dagster.core.types.dagster_type import String as RuntimeString +from dagster.core.types.dagster_type import create_any_type, resolve_dagster_type +from dagster.core.utils import make_new_run_id +from dagster.utils.test import yield_empty_pipeline_context + + +class UppercaseSerializationStrategy(SerializationStrategy): # pylint: disable=no-init + def serialize(self, value, write_file_obj): + return write_file_obj.write(bytes(value.upper().encode('utf-8'))) + + def deserialize(self, read_file_obj): + return read_file_obj.read().decode('utf-8').lower() + + +LowercaseString = create_any_type( + 'LowercaseString', serialization_strategy=UppercaseSerializationStrategy('uppercase'), +) + + +def aws_credentials_present(): + return os.getenv('AWS_ACCESS_KEY_ID') and os.getenv('AWS_SECRET_ACCESS_KEY') + + +nettest = pytest.mark.nettest + + +def define_inty_pipeline(should_throw=True): + @lambda_solid + def return_one(): + return 1 + + @lambda_solid(input_defs=[InputDefinition('num', Int)], output_def=OutputDefinition(Int)) + def add_one(num): + return num + 1 + + @lambda_solid + def user_throw_exception(): + raise Exception('whoops') + + @pipeline( + mode_defs=[ + ModeDefinition( + system_storage_defs=s3_plus_default_storage_defs, resource_defs={'s3': s3_resource} + ) + ] + ) + def basic_external_plan_execution(): + add_one(return_one()) + if should_throw: + user_throw_exception() + + return basic_external_plan_execution + + +def get_step_output(step_events, step_key, output_name='result'): + for step_event in step_events: + if ( + step_event.event_type == DagsterEventType.STEP_OUTPUT + and step_event.step_key == step_key + and step_event.step_output_data.output_name == output_name + ): + return step_event + return None + + +@nettest +def test_using_s3_for_subplan(s3_bucket): + pipeline_def = define_inty_pipeline() + + environment_dict = {'storage': {'s3': {'config': {'s3_bucket': s3_bucket}}}} + + run_id = make_new_run_id() + + execution_plan = create_execution_plan(pipeline_def, environment_dict=environment_dict) + + assert execution_plan.get_step_by_key('return_one.compute') + + step_keys = ['return_one.compute'] + instance = DagsterInstance.ephemeral() + pipeline_run = PipelineRun( + pipeline_name=pipeline_def.name, run_id=run_id, environment_dict=environment_dict + ) + + return_one_step_events = list( + execute_plan( + execution_plan.build_subset_plan(step_keys), + environment_dict=environment_dict, + pipeline_run=pipeline_run, + instance=instance, + ) + ) + + assert get_step_output(return_one_step_events, 'return_one.compute') + with scoped_pipeline_context( + execution_plan.build_subset_plan(['return_one.compute']), + environment_dict, + pipeline_run, + instance, + ) as context: + + store = S3IntermediateStore( + s3_bucket, + run_id, + s3_session=context.scoped_resources_builder.build(required_resource_keys={'s3'},).s3, + ) + intermediates_manager = IntermediateStoreIntermediatesManager(store) + step_output_handle = StepOutputHandle('return_one.compute') + assert intermediates_manager.has_intermediate(context, step_output_handle) + assert intermediates_manager.get_intermediate(context, Int, step_output_handle).obj == 1 + + add_one_step_events = list( + execute_plan( + execution_plan.build_subset_plan(['add_one.compute']), + environment_dict=environment_dict, + pipeline_run=pipeline_run, + instance=instance, + ) + ) + + assert get_step_output(add_one_step_events, 'add_one.compute') + with scoped_pipeline_context( + execution_plan.build_subset_plan(['add_one.compute']), + environment_dict, + pipeline_run, + instance, + ) as context: + step_output_handle = StepOutputHandle('add_one.compute') + assert intermediates_manager.has_intermediate(context, step_output_handle) + assert intermediates_manager.get_intermediate(context, Int, step_output_handle).obj == 2 + + +class FancyStringS3TypeStoragePlugin(TypeStoragePlugin): # pylint:disable=no-init + @classmethod + def compatible_with_storage_def(cls, _): + # Not needed for these tests + raise NotImplementedError() + + @classmethod + def set_object(cls, intermediate_store, obj, context, dagster_type, paths): + check.inst_param(intermediate_store, 'intermediate_store', S3IntermediateStore) + paths.append(obj) + return intermediate_store.set_object('', context, dagster_type, paths) + + @classmethod + def get_object(cls, intermediate_store, _context, _dagster_type, paths): + check.inst_param(intermediate_store, 'intermediate_store', S3IntermediateStore) + res = intermediate_store.object_store.s3.list_objects( + Bucket=intermediate_store.object_store.bucket, + Prefix=intermediate_store.key_for_paths(paths), + ) + return res['Contents'][0]['Key'].split('/')[-1] + + +@nettest +def test_s3_intermediate_store_with_type_storage_plugin(s3_bucket): + run_id = make_new_run_id() + + intermediate_store = S3IntermediateStore( + run_id=run_id, + s3_bucket=s3_bucket, + type_storage_plugin_registry=TypeStoragePluginRegistry( + [(RuntimeString, FancyStringS3TypeStoragePlugin)] + ), + ) + + with yield_empty_pipeline_context(run_id=run_id) as context: + try: + intermediate_store.set_value('hello', context, RuntimeString, ['obj_name']) + + assert intermediate_store.has_object(context, ['obj_name']) + assert intermediate_store.get_value(context, RuntimeString, ['obj_name']) == 'hello' + + finally: + intermediate_store.rm_object(context, ['obj_name']) + + +@nettest +def test_s3_intermediate_store_with_composite_type_storage_plugin(s3_bucket): + run_id = make_new_run_id() + + intermediate_store = S3IntermediateStore( + run_id=run_id, + s3_bucket=s3_bucket, + type_storage_plugin_registry=TypeStoragePluginRegistry( + [(RuntimeString, FancyStringS3TypeStoragePlugin)] + ), + ) + + with yield_empty_pipeline_context(run_id=run_id) as context: + with pytest.raises(check.NotImplementedCheckError): + intermediate_store.set_value( + ['hello'], context, resolve_dagster_type(List[String]), ['obj_name'] + ) + + +@nettest +def test_s3_intermediate_store_composite_types_with_custom_serializer_for_inner_type(s3_bucket): + run_id = make_new_run_id() + + intermediate_store = S3IntermediateStore(run_id=run_id, s3_bucket=s3_bucket) + with yield_empty_pipeline_context(run_id=run_id) as context: + try: + intermediate_store.set_object( + ['foo', 'bar'], context, resolve_dagster_type(List[LowercaseString]), ['list'], + ) + assert intermediate_store.has_object(context, ['list']) + assert intermediate_store.get_object( + context, resolve_dagster_type(List[Bool]), ['list'] + ).obj == ['foo', 'bar'] + + finally: + intermediate_store.rm_object(context, ['foo']) + + +@nettest +def test_s3_intermediate_store_with_custom_serializer(s3_bucket): + run_id = make_new_run_id() + + intermediate_store = S3IntermediateStore(run_id=run_id, s3_bucket=s3_bucket) + + with yield_empty_pipeline_context(run_id=run_id) as context: + try: + intermediate_store.set_object('foo', context, LowercaseString, ['foo']) + + assert ( + intermediate_store.object_store.s3.get_object( + Bucket=intermediate_store.object_store.bucket, + Key='/'.join([intermediate_store.root] + ['foo']), + )['Body'] + .read() + .decode('utf-8') + == 'FOO' + ) + + assert intermediate_store.has_object(context, ['foo']) + assert intermediate_store.get_object(context, LowercaseString, ['foo']).obj == 'foo' + finally: + intermediate_store.rm_object(context, ['foo']) + + +@nettest +def test_s3_pipeline_with_custom_prefix(s3_bucket): + s3_prefix = 'custom_prefix' + + pipe = define_inty_pipeline(should_throw=False) + environment_dict = { + 'storage': {'s3': {'config': {'s3_bucket': s3_bucket, 's3_prefix': s3_prefix}}} + } + + pipeline_run = PipelineRun(pipeline_name=pipe.name, environment_dict=environment_dict) + instance = DagsterInstance.ephemeral() + + result = execute_pipeline(pipe, environment_dict=environment_dict,) + assert result.success + + execution_plan = create_execution_plan(pipe, environment_dict) + with scoped_pipeline_context( + execution_plan, environment_dict, pipeline_run, instance, + ) as context: + store = S3IntermediateStore( + run_id=result.run_id, + s3_bucket=s3_bucket, + s3_prefix=s3_prefix, + s3_session=context.scoped_resources_builder.build(required_resource_keys={'s3'}).s3, + ) + intermediates_manager = IntermediateStoreIntermediatesManager(store) + assert store.root == '/'.join(['custom_prefix', 'storage', result.run_id]) + assert ( + intermediates_manager.get_intermediate( + context, Int, StepOutputHandle('return_one.compute') + ).obj + == 1 + ) + assert ( + intermediates_manager.get_intermediate( + context, Int, StepOutputHandle('add_one.compute') + ).obj + == 2 + ) + + +@nettest +def test_s3_intermediate_store_with_custom_prefix(s3_bucket): + run_id = make_new_run_id() + + intermediate_store = S3IntermediateStore( + run_id=run_id, s3_bucket=s3_bucket, s3_prefix='custom_prefix' + ) + assert intermediate_store.root == '/'.join(['custom_prefix', 'storage', run_id]) + + try: + with yield_empty_pipeline_context(run_id=run_id) as context: + + intermediate_store.set_object(True, context, RuntimeBool, ['true']) + + assert intermediate_store.has_object(context, ['true']) + assert intermediate_store.uri_for_paths(['true']).startswith( + 's3://%s/custom_prefix' % s3_bucket + ) + + finally: + intermediate_store.rm_object(context, ['true']) + + +@nettest +def test_s3_intermediate_store(s3_bucket): + run_id = make_new_run_id() + run_id_2 = make_new_run_id() + + intermediate_store = S3IntermediateStore(run_id=run_id, s3_bucket=s3_bucket) + assert intermediate_store.root == '/'.join(['dagster', 'storage', run_id]) + + intermediate_store_2 = S3IntermediateStore(run_id=run_id_2, s3_bucket=s3_bucket) + assert intermediate_store_2.root == '/'.join(['dagster', 'storage', run_id_2]) + + try: + with yield_empty_pipeline_context(run_id=run_id) as context: + + intermediate_store.set_object(True, context, RuntimeBool, ['true']) + + assert intermediate_store.has_object(context, ['true']) + assert intermediate_store.get_object(context, RuntimeBool, ['true']).obj is True + assert intermediate_store.uri_for_paths(['true']).startswith('s3://') + + intermediate_store_2.copy_object_from_run(context, run_id, ['true']) + assert intermediate_store_2.has_object(context, ['true']) + assert intermediate_store_2.get_object(context, RuntimeBool, ['true']).obj is True + finally: + intermediate_store.rm_object(context, ['true']) + intermediate_store_2.rm_object(context, ['true']) + + +class CsvSerializationStrategy(SerializationStrategy): + def __init__(self): + super(CsvSerializationStrategy, self).__init__( + "csv_strategy", read_mode="r", write_mode="w" + ) + + def serialize(self, value, write_file_obj): + fieldnames = value[0] + writer = csv.DictWriter(write_file_obj, fieldnames) + writer.writeheader() + writer.writerows(value) + + def deserialize(self, read_file_obj): + reader = csv.DictReader(read_file_obj) + return LessSimpleDataFrame([row for row in reader]) + + +@usable_as_dagster_type( + name="LessSimpleDataFrame", + description=("A naive representation of a data frame, e.g., as returned by " "csv.DictReader."), + serialization_strategy=CsvSerializationStrategy(), +) +class LessSimpleDataFrame(list): + pass + + +def test_custom_read_write_mode(s3_bucket): + run_id = make_new_run_id() + intermediate_store = S3IntermediateStore(run_id=run_id, s3_bucket=s3_bucket) + data_frame = [OrderedDict({'foo': '1', 'bar': '1'}), OrderedDict({'foo': '2', 'bar': '2'})] + try: + with yield_empty_pipeline_context(run_id=run_id) as context: + intermediate_store.set_object( + data_frame, context, resolve_dagster_type(LessSimpleDataFrame), ['data_frame'] + ) + + assert intermediate_store.has_object(context, ['data_frame']) + assert ( + intermediate_store.get_object( + context, resolve_dagster_type(LessSimpleDataFrame), ['data_frame'] + ).obj + == data_frame + ) + assert intermediate_store.uri_for_paths(['data_frame']).startswith('s3://') + + finally: + intermediate_store.rm_object(context, ['data_frame']) diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_object_store.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_object_store.py new file mode 100644 index 0000000000000..24c31fb350d55 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_object_store.py @@ -0,0 +1,39 @@ +from dagster_azure.adls2 import ADLS2ObjectStore, ADLS2FakeClient + +from dagster.core.storage.object_store import DEFAULT_SERIALIZATION_STRATEGY +from dagster.seven import mock + + +@mock.patch("dagster_azure.adls2.object_store.create_adls2_client") +def test_adls2_object_store( + mock_create_adls2_client, storage_account, credential, file_system, caplog +): # pylint: disable=too-many-function-args + mock_create_adls2_client.return_value = ADLS2FakeClient(storage_account, credential) + + key = 'foo' + # Uses mock ADLS2 client + adls2_obj_store = ADLS2ObjectStore(storage_account, credential, file_system) + res = adls2_obj_store.set_object(key, True, DEFAULT_SERIALIZATION_STRATEGY) + assert res.key == 'abfss://{fs}@{account}.dfs.core.windows.net/{key}'.format( + fs=file_system, account=storage_account, key=key + ) + + adls2_obj_store.set_object(key, True, DEFAULT_SERIALIZATION_STRATEGY) + assert 'Removing existing ADLS2 key' in caplog.text + + assert adls2_obj_store.has_object(key) + assert adls2_obj_store.get_object(key, DEFAULT_SERIALIZATION_STRATEGY).obj is True + + # Harder to test this since it requires a fake synchronised Blob client, + # since cp_object uses blob APIs to communicate... + # adls2_obj_store.cp_object(key, 'bar') + # assert adls2_obj_store.has_object('bar') + + adls2_obj_store.rm_object(key) + assert not adls2_obj_store.has_object(key) + + assert adls2_obj_store.uri_for_key( + key + ) == 'abfss://{fs}@{account}.dfs.core.windows.net/{key}'.format( + fs=file_system, account=storage_account, key=key + ) diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py new file mode 100644 index 0000000000000..845c4165eb414 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py @@ -0,0 +1,114 @@ +import os +import sys + +import boto3 +import six +from dagster_aws.s3 import S3ComputeLogManager +from moto import mock_s3 + +from dagster import DagsterEventType, execute_pipeline, pipeline, seven, solid +from dagster.core.instance import DagsterInstance, InstanceType +from dagster.core.storage.compute_log_manager import ComputeIOType +from dagster.core.storage.event_log import SqliteEventLogStorage +from dagster.core.storage.root import LocalArtifactStorage +from dagster.core.storage.runs import SqliteRunStorage + +HELLO_WORLD = 'Hello World' +SEPARATOR = os.linesep if (os.name == 'nt' and sys.version_info < (3,)) else '\n' +EXPECTED_LOGS = [ + 'STEP_START - Started execution of step "easy.compute".', + 'STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).', + 'STEP_SUCCESS - Finished execution of step "easy.compute"', +] + + +@mock_s3 +def test_compute_log_manager(s3_bucket): + @pipeline + def simple(): + @solid + def easy(context): + context.log.info('easy') + print(HELLO_WORLD) + return 'easy' + + easy() + + # Uses mock S3 + s3 = boto3.client('s3') + s3.create_bucket(Bucket=s3_bucket) + + with seven.TemporaryDirectory() as temp_dir: + run_store = SqliteRunStorage.from_local(temp_dir) + event_store = SqliteEventLogStorage(temp_dir) + manager = S3ComputeLogManager(bucket=s3_bucket, prefix='my_prefix', local_dir=temp_dir) + instance = DagsterInstance( + instance_type=InstanceType.PERSISTENT, + local_artifact_storage=LocalArtifactStorage(temp_dir), + run_storage=run_store, + event_storage=event_store, + compute_log_manager=manager, + ) + result = execute_pipeline(simple, instance=instance) + compute_steps = [ + event.step_key + for event in result.step_event_list + if event.event_type == DagsterEventType.STEP_START + ] + assert len(compute_steps) == 1 + step_key = compute_steps[0] + + stdout = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDOUT) + assert stdout.data == HELLO_WORLD + SEPARATOR + + stderr = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDERR) + for expected in EXPECTED_LOGS: + assert expected in stderr.data + + # Check S3 directly + s3_object = s3.get_object( + Bucket=s3_bucket, + Key='{prefix}/storage/{run_id}/compute_logs/easy.compute.err'.format( + prefix='my_prefix', run_id=result.run_id + ), + ) + stderr_s3 = six.ensure_str(s3_object['Body'].read()) + for expected in EXPECTED_LOGS: + assert expected in stderr_s3 + + # Check download behavior by deleting locally cached logs + compute_logs_dir = os.path.join(temp_dir, result.run_id, 'compute_logs') + for filename in os.listdir(compute_logs_dir): + os.unlink(os.path.join(compute_logs_dir, filename)) + + stdout = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDOUT) + assert stdout.data == HELLO_WORLD + SEPARATOR + + stderr = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDERR) + for expected in EXPECTED_LOGS: + assert expected in stderr.data + + +@mock_s3 +def test_compute_log_manager_from_config(s3_bucket): + s3_prefix = 'foobar' + + dagster_yaml = ''' +compute_logs: + module: dagster_aws.s3.compute_log_manager + class: S3ComputeLogManager + config: + bucket: "{s3_bucket}" + local_dir: "/tmp/cool" + prefix: "{s3_prefix}" +'''.format( + s3_bucket=s3_bucket, s3_prefix=s3_prefix + ) + + with seven.TemporaryDirectory() as tempdir: + with open(os.path.join(tempdir, 'dagster.yaml'), 'wb') as f: + f.write(six.ensure_binary(dagster_yaml)) + + instance = DagsterInstance.from_config(tempdir) + assert instance.compute_log_manager._s3_bucket == s3_bucket # pylint: disable=protected-access + assert instance.compute_log_manager._s3_prefix == s3_prefix # pylint: disable=protected-access diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/test_version.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/test_version.py new file mode 100644 index 0000000000000..355494028ba40 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/test_version.py @@ -0,0 +1,5 @@ +from dagster_azure.version import __version__ + + +def test_version(): + assert __version__ diff --git a/python_modules/libraries/dagster-azure/dev-requirements.txt b/python_modules/libraries/dagster-azure/dev-requirements.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-azure/setup.cfg b/python_modules/libraries/dagster-azure/setup.cfg new file mode 100644 index 0000000000000..8183238ab1c7f --- /dev/null +++ b/python_modules/libraries/dagster-azure/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +license_files = LICENSE diff --git a/python_modules/libraries/dagster-azure/setup.py b/python_modules/libraries/dagster-azure/setup.py new file mode 100644 index 0000000000000..040c2a539e816 --- /dev/null +++ b/python_modules/libraries/dagster-azure/setup.py @@ -0,0 +1,53 @@ +import argparse +import sys + +from setuptools import find_packages, setup + + +def get_version(name): + version = {} + with open('dagster_azure/version.py') as fp: + exec(fp.read(), version) # pylint: disable=W0122 + + if name == 'dagster-azure': + return version['__version__'] + elif name == 'dagster-azure-nightly': + return version['__nightly__'] + else: + raise Exception('Shouldn\'t be here: bad package name {name}'.format(name=name)) + + +parser = argparse.ArgumentParser() +parser.add_argument('--nightly', action='store_true') + + +def _do_setup(name='dagster-azure'): + setup( + name=name, + version=get_version(name), + author='Elementl', + license='Apache-2.0', + description='Package for Azure-specific Dagster framework solid and resource components.', + url='https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-azure', + classifiers=[ + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: OS Independent', + ], + packages=find_packages(exclude=['test']), + include_package_data=True, + install_requires=['azure-storage-file-datalake~=12.0.1', 'dagster'], + entry_points={'console_scripts': ['dagster-azure = dagster_azure.cli.cli:main']}, + zip_safe=False, + ) + + +if __name__ == '__main__': + parsed, unparsed = parser.parse_known_args() + sys.argv = [sys.argv[0]] + unparsed + if parsed.nightly: + _do_setup('dagster-azure-nightly') + else: + _do_setup('dagster-azure') diff --git a/python_modules/libraries/dagster-azure/tox.ini b/python_modules/libraries/dagster-azure/tox.ini new file mode 100644 index 0000000000000..da4d1347591c1 --- /dev/null +++ b/python_modules/libraries/dagster-azure/tox.ini @@ -0,0 +1,24 @@ +[tox] +envlist = py{37,36,35,27}-{unix,windows} + +[testenv] +passenv = CI_* COVERALLS_REPO_TOKEN AZURE_* BUILDKITE SSH_* +deps = + -e ../../dagster + -r ../../dagster/dev-requirements.txt + -e ../dagster-spark + -e ../dagster-pyspark + -r ./dev-requirements.txt + -e . +usedevelop = true +whitelist_externals = + /bin/bash + echo +commands = + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' + coverage erase + echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" + pytest -vv --junitxml=test_results.xml --cov=dagster_azure --cov-append --cov-report= + coverage report --omit='.tox/*,**/test_*.py' --skip-covered + coverage html --omit='.tox/*,**/test_*.py' + coverage xml --omit='.tox/*,**/test_*.py'