From 1fb5e82bbd7f1ee404c72dc98b483778fff9ef3d Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Wed, 29 Jul 2020 01:10:08 +0300 Subject: [PATCH 1/4] add data_frame --- .../dagster-dask/dagster_dask/__init__.py | 6 +- .../dagster-dask/dagster_dask/data_frame.py | 683 ++++++++++++++++++ 2 files changed, 688 insertions(+), 1 deletion(-) create mode 100644 python_modules/libraries/dagster-dask/dagster_dask/data_frame.py diff --git a/python_modules/libraries/dagster-dask/dagster_dask/__init__.py b/python_modules/libraries/dagster-dask/dagster_dask/__init__.py index 4023e247032de..0b61059ada542 100644 --- a/python_modules/libraries/dagster-dask/dagster_dask/__init__.py +++ b/python_modules/libraries/dagster-dask/dagster_dask/__init__.py @@ -1,8 +1,12 @@ from dagster.core.utils import check_dagster_package_version +from .data_frame import DataFrame from .executor import dask_executor from .version import __version__ check_dagster_package_version('dagster-dask', __version__) -__all__ = ['dask_executor'] +__all__ = [ + 'DataFrame', + 'dask_executor', +] diff --git a/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py new file mode 100644 index 0000000000000..e48805b173c84 --- /dev/null +++ b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py @@ -0,0 +1,683 @@ +import dask.dataframe as dd + +from dagster import ( + Any, + AssetMaterialization, + Bool, + DagsterInvariantViolationError, + DagsterType, + Enum, + EnumValue, + EventMetadataEntry, + Field, + Int, + Permissive, + String, + TypeCheck, + check, + dagster_type_loader, + dagster_type_materializer, +) +from dagster.config.field_utils import Selector + +WriteCompressionTextOptions = Enum( + 'WriteCompressionText', [EnumValue('gzip'), EnumValue('bz2'), EnumValue('xz'),], +) + +EngineParquetOptions = Enum( + 'EngineParquet', [EnumValue('auto'), EnumValue('fastparquet'), EnumValue('pyarrow'),], +) + + +def dict_without_keys(ddict, *keys): + return {key: value for key, value in ddict.items() if key not in set(keys)} + + +@dagster_type_materializer( + Selector( + { + 'csv': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Path glob indicating the naming scheme for the output files", + ), + 'single_file': Field( + Bool, + is_required=False, + description="Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. Note that not all filesystems support the append mode and thus the single file mode, especially on cloud storage systems such as S3 or GCS. A warning will be issued when writing to a file that is not backed by a local filesystem.", + ), + 'encoding': Field( + String, + is_required=False, + description="A string representing the encoding to use in the output file, defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.", + ), + 'mode': Field( + String, is_required=False, description="Python write mode, default 'w'", + ), + 'compression': Field( + WriteCompressionTextOptions, + is_required=False, + description="a string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz'", + ), + 'compute': Field( + Bool, + is_required=False, + description="If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Parameters passed on to the backend filesystem class.", + ), + 'header_first_partition_only': Field( + Bool, + is_required=False, + description="If set to `True`, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (`single_file` is `False`) and written only once under the single file mode (`single_file` is `True`). It must not be `False` under the single file mode.", + ), + 'compute_kwargs': Field( + Permissive(), + is_required=False, + description="Options to be passed in to the compute method", + ), + } + ), + 'parquet': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or pathlib.Path, Destination directory for data. Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data.", + ), + 'engine': Field( + EngineParquetOptions, + is_required=False, + description="{'auto', 'fastparquet', 'pyarrow'}, default 'auto' Parquet library to use. If only one library is installed, it will use that one; if both, it will use 'fastparquet'.", + ), + 'compression': Field( + Any, + is_required=False, + description="str or dict, optional Either a string like ``'snappy'`` or a dictionary mapping column names to compressors like ``{'name': 'gzip', 'values': 'snappy'}``. The default is ``'default'``, which uses the default compression for whichever engine is selected.", + ), + 'write_index': Field( + Bool, + is_required=False, + description="Whether or not to write the index. Defaults to True.", + ), + 'append': Field( + Bool, + is_required=False, + description="If False (default), construct data-set from scratch. If True, add new row-group(s) to an existing data-set. In the latter case, the data-set must exist, and the schema must match the input data.", + ), + 'ignore_divisions': Field( + Bool, + is_required=False, + description="If False (default) raises error when previous divisions overlap with the new appended divisions. Ignored if append=False.", + ), + 'partition_on': Field( + list, + is_required=False, + description="Construct directory-based partitioning by splitting on these fields values. Each dask partition will result in one or more datafiles, there will be no global groupby.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Key/value pairs to be passed on to the file-system backend, if any.", + ), + 'write_metadata_file': Field( + Bool, + is_required=False, + description="Whether to write the special '_metadata' file.", + ), + 'compute': Field( + Bool, + is_required=False, + description="If True (default) then the result is computed immediately. If False then a ``dask.delayed`` object is returned for future computation.", + ), + 'compute_kwargs': Field( + Permissive(), + is_required=False, + description="Options to be passed in to the compute method.", + ), + } + ), + 'hdf': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or pathlib.Path, Path to a target filename. Supports strings, ``pathlib.Path``, or any object implementing the ``__fspath__`` protocol. May contain a ``*`` to denote many filenames.", + ), + 'key': Field( + String, + is_required=True, + description="Datapath within the files. May contain a ``*`` to denote many locations", + ), + 'compute': Field( + Bool, + is_required=False, + description="Whether or not to execute immediately. If False then this returns a ``dask.Delayed`` value.", + ), + 'scheduler': Field( + String, + is_required=False, + description="The scheduler to use, like 'threads' or 'processes'", + ), + } + ), + 'json': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Location to write to. If a string, and there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a ``name_function=`` parameter. Supports protocol specifications such as ``'s3://'``.", + ), + 'encoding': Field( + String, + is_required=False, + description="default is 'utf-8', The text encoding to implement, e.g., 'utf-8'", + ), + 'errors': Field( + String, + is_required=False, + description="default is 'strict', how to respond to errors in the conversion (see ``str.encode()``)", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Passed to backend file-system implementation", + ), + 'compute': Field( + Bool, + is_required=False, + description="If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time.", + ), + 'compute_kwargs': Field( + Permissive(), + is_required=False, + description="Options to be passed in to the compute method", + ), + 'compression': Field( + String, is_required=False, description="String like 'gzip' or 'xz'.", + ), + }, + ), + 'sql': Permissive( + { + 'name': Field(String, is_required=True, description="Name of SQL table",), + 'uri': Field( + String, + is_required=True, + description="Full sqlalchemy URI for the database connection", + ), + 'schema': Field( + String, + is_required=False, + description="Specify the schema (if database flavor supports this). If None, use default schema.", + ), + 'if_exists': Field( + String, + is_required=False, + description=""" + {'fail', 'replace', 'append'}, default 'fail'" + How to behave if the table already exists. + * fail: Raise a ValueError. + * replace: Drop the table before inserting new values. + * append: Insert new values to the existing table. + """, + ), + 'index': Field( + Bool, + is_required=False, + description="default is True, Write DataFrame index as a column. Uses `index_label` as the column name in the table.", + ), + 'index_label': Field( + Any, + is_required=False, + description="str or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.", + ), + 'chunksize': Field( + Int, + is_required=False, + description="Specify the number of rows in each batch to be written at a time. By default, all rows will be written at once", + ), + 'dtype': Field( + Any, + is_required=False, + description="dict or scalar, Specifying the datatype for columns. If a dictionary is used, the keys should be the column names and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode. If a scalar is provided, it will be applied to all columns.", + ), + 'method': Field( + String, + is_required=False, + description=""" + {None, 'multi', callable}, default None + Controls the SQL insertion clause used: + * None : Uses standard SQL ``INSERT`` clause (one per row). + * 'multi': Pass multiple values in a single ``INSERT`` clause. + * callable with signature ``(pd_table, conn, keys, data_iter)``. + Details and a sample callable implementation can be found in the + section :ref:`insert method `. + """, + ), + 'compute': Field( + Bool, + is_required=False, + description="default is True, When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of per-block objects when parallel=True)", + ), + 'parallel': Field( + Bool, + is_required=False, + description="default is False, When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in sequence.", + ), + }, + ), + }, + ) +) +def dataframe_materializer(_context, config, dask_df): + check.inst_param(dask_df, 'dask_df', dd.DataFrame) + file_type, file_options = list(config.items())[0] + path = file_options.get('path') + + if file_type == 'csv': + dask_df.to_csv(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'parquet': + dask_df.to_parquet(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'hdf': + dask_df.to_hdf(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'json': + dask_df.to_json(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'sql': + dask_df.to_sql(**file_options) + else: + check.failed('Unsupported file_type {file_type}'.format(file_type=file_type)) + + return AssetMaterialization.file(path) + + +@dagster_type_loader( + Selector( + { + 'csv': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Absolute or relative filepath(s). Prefix with a protocol like `s3://` to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + ), + 'blocksize': Field( + Any, + is_required=False, + description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description="If True, all integer columns that aren’t specified in `dtype` are assumed to contain missing values, and are converted to floats. Default is False.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + ), + 'include_path_column': Field( + Any, + is_required=False, + description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + ), + } + ), + 'parquet': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Source directory for data, or path(s) to individual parquet files. Prefix with a protocol like s3:// to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + ), + 'columns': Field( + Any, + is_required=False, + description="str or list or None (default), Field name(s) to read in as columns in the output. By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). Provide a single field name instead of a list to read in the data as a Series.", + ), + 'filters': Field( + Any, + is_required=False, + description=""" + Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], + List of filters to apply, like [[('x', '=', 0), ...], ...]. + This implements partition-level (hive) filtering only, i.e., to prevent the loading of some row-groups and/or files. + Predicates can be expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are combined with an AND conjunction into a larger predicate. The outer-most list then combines all of the combined filters with an OR disjunction. + Predicates can also be expressed as a List[Tuple]. These are evaluated as an AND conjunction. To express OR in predictates, one must use the (preferred) List[List[Tuple]] notation. + """, + ), + 'index': Field( + Any, + is_required=False, + description="list or False or None (default), Field name(s) to use as the output frame index. By default will be inferred from the pandas parquet file metadata (if present). Use False to read all fields as columns.", + ), + 'categories': Field( + Any, + is_required=False, + description="list or dict or None, For any fields listed here, if the parquet encoding is Dictionary, the column will be created with dtype category. Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; if None, will load categories automatically for data written by dask/fastparquet, not otherwise.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Key/value pairs to be passed on to the file-system backend, if any.", + ), + 'engine': Field( + EngineParquetOptions, + is_required=False, + description="Parquet reader library to use. If only one library is installed, it will use that one; if both, it will use ‘fastparquet’", + ), + 'gather_statistics': Field( + Bool, + is_required=False, + description="default is None, Gather the statistics for each dataset partition. By default, this will only be done if the _metadata file is available. Otherwise, statistics will only be gathered if True, because the footer of every file will be parsed (which is very slow on some systems).", + ), + 'split_row_groups:': Field( + Bool, + is_required=False, + description="If True (default) then output dataframe partitions will correspond to parquet-file row-groups (when enough row-group metadata is available). Otherwise, partitions correspond to distinct files. Only the “pyarrow” engine currently supports this argument.", + ), + 'chunksize': Field( + Any, + is_required=False, + description="int or string, The target task partition size. If set, consecutive row-groups from the same file will be aggregated into the same output partition until the aggregate size reaches this value.", + ), + } + ), + 'hdf': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or pathlib.Path or list, File pattern (string), pathlib.Path, buffer to read from, or list of file paths. Can contain wildcards.", + ), + 'Key': Field( + Any, + is_required=True, + description="group identifier in the store. Can contain wildcards.", + ), + 'start': Field( + Int, + is_required=False, + description="defaults to 0, row number to start at.", + ), + 'stop': Field( + Int, + is_required=False, + description="defaults to None (the last row), row number to stop at.", + ), + 'columns': Field( + list, + is_required=False, + description="A list of columns that if not None, will limit the return columns (default is None).", + ), + 'chunksize': Field( + Any, + is_required=False, + description="Maximal number of rows per partition (default is 1000000).", + ), + 'sorted_index': Field( + Bool, + is_required=False, + description="Option to specify whether or not the input hdf files have a sorted index (default is False).", + ), + 'lock': Field( + Bool, + is_required=False, + description="Option to use a lock to prevent concurrency issues (default is True).", + ), + 'mode': Field( + String, + is_required=False, + description=""" + {‘a’, ‘r’, ‘r+’}, default ‘a’. Mode to use when opening file(s). + ‘r’ - Read-only; no data can be modified. + ‘a’ - Append; an existing file is opened for reading and writing, and if the file does not exist it is created. + ‘r+’ - It is similar to ‘a’, but the file must already exist. + """, + ), + } + ), + 'json': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Location to read from. If a string, can include a glob character to find a set of file names. Supports protocol specifications such as 's3://'.", + ), + 'encoding': Field( + String, + is_required=False, + description="The text encoding to implement, e.g., “utf-8”.", + ), + 'errors': Field( + String, + is_required=False, + description="how to respond to errors in the conversion (see str.encode()).", + ), + 'storage_option': Field( + Permissive(), + is_required=False, + description="Passed to backend file-system implementation.", + ), + 'blocksize': Field( + Int, + is_required=False, + description="default is None, If None, files are not blocked, and you get one partition per input file. If int, which can only be used for line-delimited JSON files, each partition will be approximately this size in bytes, to the nearest newline character.", + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to pre-load, to provide an empty dataframe structure to any blocks without data. Only relevant is using blocksize.", + ), + 'compression': Field( + String, + is_required=False, + description="default is None, String like ‘gzip’ or ‘xz’.", + ), + } + ), + 'sql_table': Permissive( + { + 'table': Field( + Any, + is_required=True, + description="str or sqlalchemy expression, Select columns from here.", + ), + 'uri': Field( + String, + is_required=True, + description="Full sqlalchemy URI for the database connection.", + ), + 'index_col': Field( + String, + is_required=True, + description=""" + Column which becomes the index, and defines the partitioning. Should be a indexed column in the SQL server, and any orderable type. If the type is number or time, then partition boundaries can be inferred from npartitions or bytes_per_chunk; otherwide must supply explicit divisions=. index_col could be a function to return a value, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). index_col=sql.func.abs(sql.column("value")).label("abs(value)"), or index_col=cast(sql.column("id"),types.BigInteger).label("id") to convert the textfield id to BigInteger. + Note sql, cast, types methods comes frome sqlalchemy module. + Labeling columns created by functions or arithmetic operations is required + """, + ), + 'divisions': Field( + Any, + is_required=False, + description="sequence, Values of the index column to split the table by. If given, this will override npartitions and bytes_per_chunk. The divisions are the value boundaries of the index column used to define the partitions. For example, divisions=list('acegikmoqsuwz') could be used to partition a string column lexographically into 12 partitions, with the implicit assumption that each partition contains similar numbers of records.", + ), + 'npartitions': Field( + Int, + is_required=False, + description="Number of partitions, if divisions is not given. Will split the values of the index column linearly between limits, if given, or the column max/min. The index column must be numeric or time for this to work.", + ), + 'limits': Field( + Any, + is_required=False, + description="2-tuple or None, Manually give upper and lower range of values for use with npartitions; if None, first fetches max/min from the DB. Upper limit, if given, is inclusive.", + ), + 'columns': Field( + Any, + is_required=False, + description="list of strings or None, Which columns to select; if None, gets all; can include sqlalchemy functions, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). Labeling columns created by functions or arithmetic operations is recommended.", + ), + 'bytes_per_chunk': Field( + Any, + is_required=False, + description="str or int, If both divisions and npartitions is None, this is the target size of each partition, in bytes.", + ), + 'head_rows': Field( + Int, + is_required=False, + description="How many rows to load for inferring the data-types, unless passing meta.", + ), + 'schema': Field( + String, + is_required=False, + description="If using a table name, pass this to sqlalchemy to select which DB schema to use within the URI connection.", + ), + } + ), + 'table': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Absolute or relative filepath(s). Prefix with a protocol like 's3://' to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + ), + 'blocksize': Field( + Any, + is_required=False, + description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description="If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, and are converted to floats. Default is False.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + ), + 'include_path_column': Field( + Any, + is_required=False, + description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + ), + } + ), + 'fwf': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Absolute or relative filepath(s). Prefix with a protocol like 's3://' to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + ), + 'blocksize': Field( + Any, + is_required=False, + description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description="If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, and are converted to floats. Default is False.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + ), + 'include_path_column': Field( + Any, + is_required=False, + description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + ), + } + ), + 'orc': Permissive( + { + 'path': Field( + Any, + is_required=True, + description="str or list, Location of file(s), which can be a full URL with protocol specifier, and may include glob character if a single string.", + ), + 'columns': Field( + list, is_required=False, description="Columns to load. If None, loads all.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Further parameters to pass to the bytes backend.", + ), + } + ), + }, + ) +) +def dataframe_loader(_context, config): + file_type, file_options = list(config.items())[0] + path = file_options.get('path') + + if file_type == 'csv': + return dd.read_csv(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'parquet': + return dd.read_parquet(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'hdf': + return dd.read_hdf(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'json': + return dd.read_json(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'sql_table': + return dd.read_sql_table(**file_options) + elif file_type == 'table': + return dd.read_table(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'fwf': + return dd.read_fwf(path, **dict_without_keys(file_options, 'path')) + elif file_type == 'orc': + return dd.read_orc(path, **dict_without_keys(file_options, 'path')) + else: + raise DagsterInvariantViolationError( + 'Unsupported file_type {file_type}'.format(file_type=file_type) + ) + + +def df_type_check(_, value): + if not isinstance(value, dd.DataFrame): + return TypeCheck(success=False) + return TypeCheck( + success=True, + metadata_entries=[ + EventMetadataEntry.text(str(len(value)), 'row_count', 'Number of rows in DataFrame'), + # string cast columns since they may be things like datetime + EventMetadataEntry.json({'columns': list(map(str, value.columns))}, 'metadata'), + ], + ) + + +DataFrame = DagsterType( + name='DaskDataFrame', + description='''A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. + These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. + One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames. + See https://docs.dask.org/en/latest/dataframe.html''', + loader=dataframe_loader, + materializer=dataframe_materializer, + type_check_fn=df_type_check, +) From f491022e2658dc9c0aaf35adf2939ac371e8c66d Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Wed, 29 Jul 2020 01:16:52 +0300 Subject: [PATCH 2/4] add tests --- .../dagster-dask/dagster_dask_tests/num.csv | 3 + .../dagster-dask/dagster_dask_tests/num.json | 2 + .../dagster_dask_tests/num.parquet | Bin 0 -> 1451 bytes .../dagster_dask_tests/test_data_frame.py | 70 ++++++++++++++++++ .../dagster_dask_tests/test_execute.py | 34 ++++++++- python_modules/libraries/dagster-dask/tox.ini | 2 + 6 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 python_modules/libraries/dagster-dask/dagster_dask_tests/num.csv create mode 100644 python_modules/libraries/dagster-dask/dagster_dask_tests/num.json create mode 100644 python_modules/libraries/dagster-dask/dagster_dask_tests/num.parquet create mode 100644 python_modules/libraries/dagster-dask/dagster_dask_tests/test_data_frame.py diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/num.csv b/python_modules/libraries/dagster-dask/dagster_dask_tests/num.csv new file mode 100644 index 0000000000000..fd76dfe8cd884 --- /dev/null +++ b/python_modules/libraries/dagster-dask/dagster_dask_tests/num.csv @@ -0,0 +1,3 @@ +num1,num2 +1,2 +3,4 \ No newline at end of file diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/num.json b/python_modules/libraries/dagster-dask/dagster_dask_tests/num.json new file mode 100644 index 0000000000000..5dc40bbc2f4ec --- /dev/null +++ b/python_modules/libraries/dagster-dask/dagster_dask_tests/num.json @@ -0,0 +1,2 @@ +{"num1":1,"num2":2} +{"num1":3,"num2":4} \ No newline at end of file diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/num.parquet b/python_modules/libraries/dagster-dask/dagster_dask_tests/num.parquet new file mode 100644 index 0000000000000000000000000000000000000000..923b2fbf1b12f08ab733a1624738e61e7b229052 GIT binary patch literal 1451 zcmcJPziymB5XKi+Bhn?77A+D5S~=*7yOX(`qbRz@P2|E#PO(!(5hAcjh+q%0Oc zH*o0(NRh&29w3j9(vOnDrDqM?KS-4eLM)p3_~y4WV#kMXJ%Wit_Ksn}y9hlVDX6@( zY9)y560|G7KkXA(jR!n$_a&&q)5e-y>#BQ z#?;t{uER=2IZ*x}oG}rk96k|6l7Qk<%#+ApkClb|{4$e5R){#9M4W*d{;=$Fk(hk^ z%EMg7Q=SJ=kOv^2F)lzJi_h}gcc-su;qWODO*}uT=4U0JQ1?t`~AVQ{*cWPh2;5J!Si02 iWlp7Vy#BE7^-f+Nj!%x>oLVCrq4GvIaN3UHn7#+5@DNh~ literal 0 HcmV?d00001 diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/test_data_frame.py b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_data_frame.py new file mode 100644 index 0000000000000..671228b26d6d5 --- /dev/null +++ b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_data_frame.py @@ -0,0 +1,70 @@ +import shutil + +import dask.dataframe as dd +import pytest +from dagster_dask import DataFrame +from dask.dataframe.utils import assert_eq + +from dagster import InputDefinition, OutputDefinition, execute_solid, file_relative_path, solid +from dagster.utils.test import get_temp_dir + + +def create_dask_df(): + path = file_relative_path(__file__, 'num.csv') + return dd.read_csv(path) + + +@pytest.mark.parametrize( + 'file_type,read,kwargs', + [ + pytest.param('csv', dd.read_csv, {'index': False}, id='csv'), + pytest.param('parquet', dd.read_parquet, {'write_index': False}, id='parquet'), + pytest.param('json', dd.read_json, {}, id='json'), + ], +) +def test_dataframe_outputs(file_type, read, kwargs): + df = create_dask_df() + + @solid(output_defs=[OutputDefinition(dagster_type=DataFrame, name='output_df')]) + def return_df(_): + return df + + with get_temp_dir() as temp_path: + shutil.rmtree(temp_path) + result = execute_solid( + return_df, + run_config={ + 'solids': { + 'return_df': { + 'outputs': [{'output_df': {file_type: {'path': temp_path, **kwargs}}}] + } + } + }, + ) + assert result.success + actual = read(f"{temp_path}/*") + assert assert_eq(actual, df) + + +@pytest.mark.parametrize( + 'file_type', + [ + pytest.param('csv', id='csv'), + pytest.param('parquet', id='parquet'), + pytest.param('json', id='json'), + ], +) +def test_dataframe_inputs(file_type): + @solid(input_defs=[InputDefinition(dagster_type=DataFrame, name='input_df')]) + def return_df(_, input_df): + return input_df + + file_name = file_relative_path(__file__, f"num.{file_type}") + result = execute_solid( + return_df, + run_config={ + 'solids': {'return_df': {'inputs': {'input_df': {file_type: {'path': file_name}}}}} + }, + ) + assert result.success + assert assert_eq(result.output_value(), create_dask_df()) diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/test_execute.py b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_execute.py index 6baf014d2a6e6..88d5a4dc919cb 100644 --- a/python_modules/libraries/dagster-dask/dagster_dask_tests/test_execute.py +++ b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_execute.py @@ -1,5 +1,5 @@ import dagster_pandas as dagster_pd -from dagster_dask import dask_executor +from dagster_dask import DataFrame, dask_executor from dagster import ( InputDefinition, @@ -88,3 +88,35 @@ def test_pandas_dask(): ) assert result.success + + +@solid(input_defs=[InputDefinition('df', DataFrame)]) +def dask_solid(_, df): # pylint: disable=unused-argument + pass + + +@pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [dask_executor])]) +def dask_pipeline(): + return dask_solid() + + +def test_dask(): + run_config = { + 'solids': { + 'dask_solid': { + 'inputs': {'df': {'csv': {'path': file_relative_path(__file__, 'ex*.csv')}}} + } + } + } + + result = execute_pipeline( + ReconstructablePipeline.for_file(__file__, dask_pipeline.name), + run_config={ + 'storage': {'filesystem': {}}, + 'execution': {'dask': {'config': {'cluster': {'local': {'timeout': 30}}}}}, + **run_config, + }, + instance=DagsterInstance.local_temp(), + ) + + assert result.success diff --git a/python_modules/libraries/dagster-dask/tox.ini b/python_modules/libraries/dagster-dask/tox.ini index 834b0ec297176..edbd10f9b8399 100644 --- a/python_modules/libraries/dagster-dask/tox.ini +++ b/python_modules/libraries/dagster-dask/tox.ini @@ -19,6 +19,8 @@ commands = !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' coverage erase echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" + python -m pip install "dask[dataframe]" --upgrade + pip install pyarrow pytest -vv --junitxml=test_results.xml --cov=dagster_dask --cov-append --cov-report= coverage report --omit='.tox/*,**/test_*.py' --skip-covered coverage html --omit='.tox/*,**/test_*.py' From 0e977abfbc25e24fb1e6a0d6a353bbaeef15adc6 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Thu, 30 Jul 2020 15:01:11 +0300 Subject: [PATCH 3/4] fix `description` --- .../dagster-dask/dagster_dask/data_frame.py | 367 ++++++++++++++---- 1 file changed, 291 insertions(+), 76 deletions(-) diff --git a/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py index e48805b173c84..c5be46053a5a8 100644 --- a/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py +++ b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py @@ -46,12 +46,21 @@ def dict_without_keys(ddict, *keys): 'single_file': Field( Bool, is_required=False, - description="Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. Note that not all filesystems support the append mode and thus the single file mode, especially on cloud storage systems such as S3 or GCS. A warning will be issued when writing to a file that is not backed by a local filesystem.", + description=""" + Whether to save everything into a single CSV file. + Under the single file mode, each partition is appended at the end of the specified CSV file. + Note that not all filesystems support the append mode and thus the single file mode, + especially on cloud storage systems such as S3 or GCS. + A warning will be issued when writing to a file that is not backed by a local filesystem. + """, ), 'encoding': Field( String, is_required=False, - description="A string representing the encoding to use in the output file, defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.", + description=""" + A string representing the encoding to use in the output file, + defaults to 'ascii' on Python 2 and 'utf-8' on Python 3. + """, ), 'mode': Field( String, is_required=False, description="Python write mode, default 'w'", @@ -59,12 +68,18 @@ def dict_without_keys(ddict, *keys): 'compression': Field( WriteCompressionTextOptions, is_required=False, - description="a string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz'", + description=""" + a string representing the compression to use in the output file, + allowed values are 'gzip', 'bz2', 'xz'. + """, ), 'compute': Field( Bool, is_required=False, - description="If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time.", + description=""" + If true, immediately executes. + If False, returns a set of delayed objects, which can be computed at a later time. + """, ), 'storage_options': Field( Permissive(), @@ -74,7 +89,13 @@ def dict_without_keys(ddict, *keys): 'header_first_partition_only': Field( Bool, is_required=False, - description="If set to `True`, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (`single_file` is `False`) and written only once under the single file mode (`single_file` is `True`). It must not be `False` under the single file mode.", + description=""" + If set to `True`, only write the header row in the first output file. + By default, headers are written to all partitions + under the multiple file mode (`single_file` is `False`) + and written only once under the single file mode (`single_file` is `True`). + It must not be `False` under the single file mode. + """, ), 'compute_kwargs': Field( Permissive(), @@ -88,17 +109,27 @@ def dict_without_keys(ddict, *keys): 'path': Field( Any, is_required=True, - description="str or pathlib.Path, Destination directory for data. Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data.", + description=""" + str or pathlib.Path, Destination directory for data. + Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data. + """, ), 'engine': Field( EngineParquetOptions, is_required=False, - description="{'auto', 'fastparquet', 'pyarrow'}, default 'auto' Parquet library to use. If only one library is installed, it will use that one; if both, it will use 'fastparquet'.", + description=""" + {'auto', 'fastparquet', 'pyarrow'}, default 'auto' Parquet library to use. + If only one library is installed, it will use that one; if both, it will use 'fastparquet'. + """, ), 'compression': Field( Any, is_required=False, - description="str or dict, optional Either a string like ``'snappy'`` or a dictionary mapping column names to compressors like ``{'name': 'gzip', 'values': 'snappy'}``. The default is ``'default'``, which uses the default compression for whichever engine is selected.", + description=""" + str or dict, optional Either a string like ``'snappy'`` + or a dictionary mapping column names to compressors like ``{'name': 'gzip', 'values': 'snappy'}``. + The default is ``'default'``, which uses the default compression for whichever engine is selected. + """, ), 'write_index': Field( Bool, @@ -108,17 +139,27 @@ def dict_without_keys(ddict, *keys): 'append': Field( Bool, is_required=False, - description="If False (default), construct data-set from scratch. If True, add new row-group(s) to an existing data-set. In the latter case, the data-set must exist, and the schema must match the input data.", + description=""" + If False (default), construct data-set from scratch. + If True, add new row-group(s) to an existing data-set. + In the latter case, the data-set must exist, and the schema must match the input data. + """, ), 'ignore_divisions': Field( Bool, is_required=False, - description="If False (default) raises error when previous divisions overlap with the new appended divisions. Ignored if append=False.", + description=""" + If False (default) raises error when previous divisions overlap with the new appended divisions. + Ignored if append=False. + """, ), 'partition_on': Field( list, is_required=False, - description="Construct directory-based partitioning by splitting on these fields values. Each dask partition will result in one or more datafiles, there will be no global groupby.", + description=""" + Construct directory-based partitioning by splitting on these fields values. + Each dask partition will result in one or more datafiles, there will be no global groupby. + """, ), 'storage_options': Field( Permissive(), @@ -133,7 +174,10 @@ def dict_without_keys(ddict, *keys): 'compute': Field( Bool, is_required=False, - description="If True (default) then the result is computed immediately. If False then a ``dask.delayed`` object is returned for future computation.", + description=""" + If True (default) then the result is computed immediately. + If False then a ``dask.delayed`` object is returned for future computation. + """, ), 'compute_kwargs': Field( Permissive(), @@ -147,22 +191,32 @@ def dict_without_keys(ddict, *keys): 'path': Field( Any, is_required=True, - description="str or pathlib.Path, Path to a target filename. Supports strings, ``pathlib.Path``, or any object implementing the ``__fspath__`` protocol. May contain a ``*`` to denote many filenames.", + description=""" + str or pathlib.Path, Path to a target filename. + Supports strings, ``pathlib.Path``, or any object implementing the ``__fspath__`` protocol. + May contain a ``*`` to denote many filenames. + """, ), 'key': Field( String, is_required=True, - description="Datapath within the files. May contain a ``*`` to denote many locations", + description=""" + Datapath within the files. + May contain a ``*`` to denote many locations. + """, ), 'compute': Field( Bool, is_required=False, - description="Whether or not to execute immediately. If False then this returns a ``dask.Delayed`` value.", + description=""" + Whether or not to execute immediately. + If False then this returns a ``dask.Delayed`` value. + """, ), 'scheduler': Field( String, is_required=False, - description="The scheduler to use, like 'threads' or 'processes'", + description="The scheduler to use, like 'threads' or 'processes'.", ), } ), @@ -171,17 +225,23 @@ def dict_without_keys(ddict, *keys): 'path': Field( Any, is_required=True, - description="str or list, Location to write to. If a string, and there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a ``name_function=`` parameter. Supports protocol specifications such as ``'s3://'``.", + description=""" + str or list, Location to write to. + If a string, and there are more than one partitions in df, + should include a glob character to expand into a set of file names, + or provide a ``name_function=`` parameter. + Supports protocol specifications such as ``'s3://'``. + """, ), 'encoding': Field( String, is_required=False, - description="default is 'utf-8', The text encoding to implement, e.g., 'utf-8'", + description="default is 'utf-8', The text encoding to implement, e.g., 'utf-8'.", ), 'errors': Field( String, is_required=False, - description="default is 'strict', how to respond to errors in the conversion (see ``str.encode()``)", + description="default is 'strict', how to respond to errors in the conversion (see ``str.encode()``).", ), 'storage_options': Field( Permissive(), @@ -191,7 +251,10 @@ def dict_without_keys(ddict, *keys): 'compute': Field( Bool, is_required=False, - description="If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time.", + description=""" + If true, immediately executes. + If False, returns a set of delayed objects, which can be computed at a later time. + """, ), 'compute_kwargs': Field( Permissive(), @@ -230,22 +293,37 @@ def dict_without_keys(ddict, *keys): 'index': Field( Bool, is_required=False, - description="default is True, Write DataFrame index as a column. Uses `index_label` as the column name in the table.", + description=""" + default is True, Write DataFrame index as a column. + Uses `index_label` as the column name in the table. + """, ), 'index_label': Field( Any, is_required=False, - description="str or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.", + description=""" + str or sequence, default None Column label for index column(s). + If None is given (default) and `index` is True, then the index names are used. + A sequence should be given if the DataFrame uses MultiIndex. + """, ), 'chunksize': Field( Int, is_required=False, - description="Specify the number of rows in each batch to be written at a time. By default, all rows will be written at once", + description=""" + Specify the number of rows in each batch to be written at a time. + By default, all rows will be written at once. + """, ), 'dtype': Field( Any, is_required=False, - description="dict or scalar, Specifying the datatype for columns. If a dictionary is used, the keys should be the column names and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode. If a scalar is provided, it will be applied to all columns.", + description=""" + dict or scalar, Specifying the datatype for columns. + If a dictionary is used, the keys should be the column names + and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode. + If a scalar is provided, it will be applied to all columns. + """, ), 'method': Field( String, @@ -263,12 +341,19 @@ def dict_without_keys(ddict, *keys): 'compute': Field( Bool, is_required=False, - description="default is True, When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of per-block objects when parallel=True)", + description=""" + default is True, When true, call dask.compute and perform the load into SQL; + otherwise, return a Dask object (or array of per-block objects when parallel=True). + """, ), 'parallel': Field( Bool, is_required=False, - description="default is False, When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in sequence.", + description=""" + default is False, When true, have each block append itself to the DB table concurrently. + This can result in DB rows being in a different order than the source DataFrame's corresponding rows. + When false, load each block into the SQL DB in sequence. + """, ), }, ), @@ -304,12 +389,21 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Absolute or relative filepath(s). Prefix with a protocol like `s3://` to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like `s3://` to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, ), 'blocksize': Field( Any, is_required=False, - description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + description=""" + str or int or None, Number of bytes by which to cut up larger files. + Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. + Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file. + """, ), 'sample': Field( Int, @@ -319,17 +413,27 @@ def dataframe_materializer(_context, config, dask_df): 'assume_missing': Field( Bool, is_required=False, - description="If True, all integer columns that aren’t specified in `dtype` are assumed to contain missing values, and are converted to floats. Default is False.", + description=""" + If True, all integer columns that aren’t specified in `dtype` are assumed to contain missing values, + and are converted to floats. Default is False. + """, ), 'storage_options': Field( Permissive(), is_required=False, - description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, ), 'include_path_column': Field( Any, is_required=False, - description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, ), } ), @@ -338,33 +442,41 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Source directory for data, or path(s) to individual parquet files. Prefix with a protocol like s3:// to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + description=""" + str or list, Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like s3:// to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, ), 'columns': Field( - Any, - is_required=False, - description="str or list or None (default), Field name(s) to read in as columns in the output. By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). Provide a single field name instead of a list to read in the data as a Series.", - ), - 'filters': Field( Any, is_required=False, description=""" - Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], - List of filters to apply, like [[('x', '=', 0), ...], ...]. - This implements partition-level (hive) filtering only, i.e., to prevent the loading of some row-groups and/or files. - Predicates can be expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are combined with an AND conjunction into a larger predicate. The outer-most list then combines all of the combined filters with an OR disjunction. - Predicates can also be expressed as a List[Tuple]. These are evaluated as an AND conjunction. To express OR in predictates, one must use the (preferred) List[List[Tuple]] notation. + str or list or None (default), Field name(s) to read in as columns in the output. + By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). + Provide a single field name instead of a list to read in the data as a Series. """, ), 'index': Field( Any, is_required=False, - description="list or False or None (default), Field name(s) to use as the output frame index. By default will be inferred from the pandas parquet file metadata (if present). Use False to read all fields as columns.", + description=""" + list or False or None (default), Field name(s) to use as the output frame index. + By default will be inferred from the pandas parquet file metadata (if present). + Use False to read all fields as columns. + """, ), 'categories': Field( Any, is_required=False, - description="list or dict or None, For any fields listed here, if the parquet encoding is Dictionary, the column will be created with dtype category. Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; if None, will load categories automatically for data written by dask/fastparquet, not otherwise.", + description=""" + list or dict or None, For any fields listed here, + if the parquet encoding is Dictionary, the column will be created with dtype category. + Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; + if None, will load categories automatically for data written by dask/fastparquet, not otherwise. + """, ), 'storage_options': Field( Permissive(), @@ -374,22 +486,40 @@ def dataframe_materializer(_context, config, dask_df): 'engine': Field( EngineParquetOptions, is_required=False, - description="Parquet reader library to use. If only one library is installed, it will use that one; if both, it will use ‘fastparquet’", + description=""" + Parquet reader library to use. + If only one library is installed, it will use that one; + if both, it will use ‘fastparquet’. + """, ), 'gather_statistics': Field( Bool, is_required=False, - description="default is None, Gather the statistics for each dataset partition. By default, this will only be done if the _metadata file is available. Otherwise, statistics will only be gathered if True, because the footer of every file will be parsed (which is very slow on some systems).", + description=""" + default is None, Gather the statistics for each dataset partition. + By default, this will only be done if the _metadata file is available. + Otherwise, statistics will only be gathered if True, + because the footer of every file will be parsed (which is very slow on some systems). + """, ), 'split_row_groups:': Field( Bool, is_required=False, - description="If True (default) then output dataframe partitions will correspond to parquet-file row-groups (when enough row-group metadata is available). Otherwise, partitions correspond to distinct files. Only the “pyarrow” engine currently supports this argument.", + description=""" + If True (default) then output dataframe partitions will correspond + to parquet-file row-groups (when enough row-group metadata is available). + Otherwise, partitions correspond to distinct files. + Only the “pyarrow” engine currently supports this argument. + """, ), 'chunksize': Field( Any, is_required=False, - description="int or string, The target task partition size. If set, consecutive row-groups from the same file will be aggregated into the same output partition until the aggregate size reaches this value.", + description=""" + int or string, The target task partition size. + If set, consecutive row-groups from the same file will be aggregated + into the same output partition until the aggregate size reaches this value. + """, ), } ), @@ -398,7 +528,11 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or pathlib.Path or list, File pattern (string), pathlib.Path, buffer to read from, or list of file paths. Can contain wildcards.", + description=""" + str or pathlib.Path or list, + File pattern (string), pathlib.Path, buffer to read from, or list of file paths. + Can contain wildcards. + """, ), 'Key': Field( Any, @@ -452,7 +586,11 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Location to read from. If a string, can include a glob character to find a set of file names. Supports protocol specifications such as 's3://'.", + description=""" + str or list, Location to read from. + If a string, can include a glob character to find a set of file names. + Supports protocol specifications such as 's3://'. + """, ), 'encoding': Field( String, @@ -472,12 +610,20 @@ def dataframe_materializer(_context, config, dask_df): 'blocksize': Field( Int, is_required=False, - description="default is None, If None, files are not blocked, and you get one partition per input file. If int, which can only be used for line-delimited JSON files, each partition will be approximately this size in bytes, to the nearest newline character.", + description=""" + default is None, If None, files are not blocked, and you get one partition per input file. + If int, which can only be used for line-delimited JSON files, + each partition will be approximately this size in bytes, to the nearest newline character. + """, ), 'sample': Field( Int, is_required=False, - description="Number of bytes to pre-load, to provide an empty dataframe structure to any blocks without data. Only relevant is using blocksize.", + description=""" + Number of bytes to pre-load, + to provide an empty dataframe structure to any blocks without data. + Only relevant is using blocksize. + """, ), 'compression': Field( String, @@ -502,35 +648,55 @@ def dataframe_materializer(_context, config, dask_df): String, is_required=True, description=""" - Column which becomes the index, and defines the partitioning. Should be a indexed column in the SQL server, and any orderable type. If the type is number or time, then partition boundaries can be inferred from npartitions or bytes_per_chunk; otherwide must supply explicit divisions=. index_col could be a function to return a value, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). index_col=sql.func.abs(sql.column("value")).label("abs(value)"), or index_col=cast(sql.column("id"),types.BigInteger).label("id") to convert the textfield id to BigInteger. - Note sql, cast, types methods comes frome sqlalchemy module. - Labeling columns created by functions or arithmetic operations is required + Column which becomes the index, and defines the partitioning. + Should be a indexed column in the SQL server, and any orderable type. + If the type is number or time, then partition boundaries can be inferred from npartitions or bytes_per_chunk; + otherwide must supply explicit divisions=. + index_col could be a function to return a value, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). + index_col=sql.func.abs(sql.column("value")).label("abs(value)"), + or index_col=cast(sql.column("id"),types.BigInteger).label("id") to convert the textfield id to BigInteger. + Note sql, cast, types methods comes frome sqlalchemy module. + Labeling columns created by functions or arithmetic operations is required """, ), 'divisions': Field( Any, is_required=False, - description="sequence, Values of the index column to split the table by. If given, this will override npartitions and bytes_per_chunk. The divisions are the value boundaries of the index column used to define the partitions. For example, divisions=list('acegikmoqsuwz') could be used to partition a string column lexographically into 12 partitions, with the implicit assumption that each partition contains similar numbers of records.", + description=""" + sequence, Values of the index column to split the table by. + If given, this will override npartitions and bytes_per_chunk. + The divisions are the value boundaries of the index column used to define the partitions. + For example, divisions=list('acegikmoqsuwz') could be used + to partition a string column lexographically into 12 partitions, + with the implicit assumption that each partition contains similar numbers of records. + """, ), 'npartitions': Field( Int, is_required=False, - description="Number of partitions, if divisions is not given. Will split the values of the index column linearly between limits, if given, or the column max/min. The index column must be numeric or time for this to work.", - ), - 'limits': Field( - Any, - is_required=False, - description="2-tuple or None, Manually give upper and lower range of values for use with npartitions; if None, first fetches max/min from the DB. Upper limit, if given, is inclusive.", + description=""" + Number of partitions, if divisions is not given. + Will split the values of the index column linearly between limits, if given, or the column max/min. + The index column must be numeric or time for this to work. + """, ), 'columns': Field( Any, is_required=False, - description="list of strings or None, Which columns to select; if None, gets all; can include sqlalchemy functions, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). Labeling columns created by functions or arithmetic operations is recommended.", + description=""" + list of strings or None, Which columns to select; + if None, gets all; can include sqlalchemy functions, + e.g., sql.func.abs(sql.column('value')).label('abs(value)'). + Labeling columns created by functions or arithmetic operations is recommended. + """, ), 'bytes_per_chunk': Field( Any, is_required=False, - description="str or int, If both divisions and npartitions is None, this is the target size of each partition, in bytes.", + description=""" + str or int, If both divisions and npartitions is None, + this is the target size of each partition, in bytes. + """, ), 'head_rows': Field( Int, @@ -540,7 +706,10 @@ def dataframe_materializer(_context, config, dask_df): 'schema': Field( String, is_required=False, - description="If using a table name, pass this to sqlalchemy to select which DB schema to use within the URI connection.", + description=""" + If using a table name, pass this to sqlalchemy to select + which DB schema to use within the URI connection. + """, ), } ), @@ -549,12 +718,22 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Absolute or relative filepath(s). Prefix with a protocol like 's3://' to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like 's3://' to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, ), 'blocksize': Field( Any, is_required=False, - description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + description=""" + str or int or None, Number of bytes by which to cut up larger files. + Default value is computed based on available physical memory and the number of cores, + up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. + If None, a single block is used for each file. + """, ), 'sample': Field( Int, @@ -564,17 +743,27 @@ def dataframe_materializer(_context, config, dask_df): 'assume_missing': Field( Bool, is_required=False, - description="If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, and are converted to floats. Default is False.", + description=""" + If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, + and are converted to floats. Default is False. + """, ), 'storage_options': Field( Permissive(), is_required=False, - description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, ), 'include_path_column': Field( Any, is_required=False, - description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, ), } ), @@ -583,12 +772,23 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Absolute or relative filepath(s). Prefix with a protocol like 's3://' to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.", + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like 's3://' to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, ), 'blocksize': Field( Any, is_required=False, - description="str or int or None, Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file.", + description=""" + str or int or None, Number of bytes by which to cut up larger files. + Default value is computed based on available physical memory + and the number of cores up to a maximum of 64MB. + Can be a number like 64000000` or a string like ``'64MB'. + If None, a single block is used for each file. + """, ), 'sample': Field( Int, @@ -598,17 +798,28 @@ def dataframe_materializer(_context, config, dask_df): 'assume_missing': Field( Bool, is_required=False, - description="If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, and are converted to floats. Default is False.", + description=""" + If True, all integer columns that aren’t specified in dtype are assumed + to contain missing values, and are converted to floats. + Default is False. + """, ), 'storage_options': Field( Permissive(), is_required=False, - description="Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc.", + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, ), 'include_path_column': Field( Any, is_required=False, - description="bool or str, Whether or not to include the path to each particular file. If True a new column is added to the dataframe called path. If str, sets new column name. Default is False.", + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, ), } ), @@ -617,7 +828,11 @@ def dataframe_materializer(_context, config, dask_df): 'path': Field( Any, is_required=True, - description="str or list, Location of file(s), which can be a full URL with protocol specifier, and may include glob character if a single string.", + description=""" + str or list, Location of file(s), + which can be a full URL with protocol specifier, + and may include glob character if a single string. + """, ), 'columns': Field( list, is_required=False, description="Columns to load. If None, loads all.", From 0df68941b1836f43265eefe729626bb63e43f418 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Thu, 30 Jul 2020 15:01:24 +0300 Subject: [PATCH 4/4] fix tests --- python_modules/libraries/dagster-dask/dev-requirements.txt | 2 ++ python_modules/libraries/dagster-dask/setup.py | 2 +- python_modules/libraries/dagster-dask/tox.ini | 3 +-- 3 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 python_modules/libraries/dagster-dask/dev-requirements.txt diff --git a/python_modules/libraries/dagster-dask/dev-requirements.txt b/python_modules/libraries/dagster-dask/dev-requirements.txt new file mode 100644 index 0000000000000..913aa24c35d20 --- /dev/null +++ b/python_modules/libraries/dagster-dask/dev-requirements.txt @@ -0,0 +1,2 @@ +# we need `pyarrow` for testing read/write parquet files. +pyarrow diff --git a/python_modules/libraries/dagster-dask/setup.py b/python_modules/libraries/dagster-dask/setup.py index ff179388bda1b..ede744eaa26f1 100644 --- a/python_modules/libraries/dagster-dask/setup.py +++ b/python_modules/libraries/dagster-dask/setup.py @@ -30,7 +30,7 @@ def get_version(): 'bokeh', 'dagster', 'dagster_graphql', - 'dask>=1.2.2', + 'dask[dataframe]>=1.2.2', 'distributed>=1.28.1', ], extras_require={ diff --git a/python_modules/libraries/dagster-dask/tox.ini b/python_modules/libraries/dagster-dask/tox.ini index edbd10f9b8399..bee85841ed383 100644 --- a/python_modules/libraries/dagster-dask/tox.ini +++ b/python_modules/libraries/dagster-dask/tox.ini @@ -10,6 +10,7 @@ deps = -e ../dagster-aws -e ../dagster-pandas -e . + -r dev-requirements.txt extras = yarn,pbs,kube usedevelop = true whitelist_externals = @@ -19,8 +20,6 @@ commands = !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' coverage erase echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" - python -m pip install "dask[dataframe]" --upgrade - pip install pyarrow pytest -vv --junitxml=test_results.xml --cov=dagster_dask --cov-append --cov-report= coverage report --omit='.tox/*,**/test_*.py' --skip-covered coverage html --omit='.tox/*,**/test_*.py'