diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index 0d07fda770..ff9d692cec 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -60,7 +60,7 @@ def decode( path = flyte_value.uri local_dir = ctx.file_access.get_random_local_directory() ctx.file_access.get_data(path, local_dir, is_multipart=True) - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] return pd.read_parquet(local_dir, columns=columns) return pd.read_parquet(local_dir) @@ -98,7 +98,7 @@ def decode( path = flyte_value.uri local_dir = ctx.file_access.get_random_local_directory() ctx.file_access.get_data(path, local_dir, is_multipart=True) - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] return pq.read_table(local_dir, columns=columns) return pq.read_table(local_dir) diff --git a/flytekit/types/structured/bigquery.py b/flytekit/types/structured/bigquery.py index d0212c246b..aa0ef42f6b 100644 --- a/flytekit/types/structured/bigquery.py +++ b/flytekit/types/structured/bigquery.py @@ -38,7 +38,7 @@ def _read_from_bq( parent = "projects/{}".format(project_id) read_options = None - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] read_options = types.ReadSession.TableReadOptions(selected_fields=columns) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py index 45a23ed222..d47318666f 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py @@ -55,7 +55,7 @@ def decode( _, path = split_protocol(uri) columns = None - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] try: fs = FSSpecPersistence.get_filesystem(uri) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py index 490cd5ef70..07b58d243a 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py @@ -63,7 +63,7 @@ def decode( uri = flyte_value.uri columns = None kwargs = get_storage_options(uri) - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] try: return pd.read_parquet(uri, columns=columns, storage_options=kwargs) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py b/plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py index 6a32457fe1..cd451fa080 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py @@ -42,7 +42,7 @@ def decode( current_task_metadata: StructuredDatasetMetadata, ) -> DataFrame: user_ctx = FlyteContext.current_context().user_space_params - if current_task_metadata and current_task_metadata.structured_dataset_type.columns: + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] return user_ctx.spark_session.read.parquet(flyte_value.uri).select(*columns) return user_ctx.spark_session.read.parquet(flyte_value.uri)