From b5d67bf209fea2679b6eb343052d57642c3c5ae0 Mon Sep 17 00:00:00 2001 From: Samhita Alla Date: Sat, 4 Sep 2021 17:49:36 +0530 Subject: [PATCH] Fix GE Task Examples (#389) Signed-off-by: Samhita Alla --- .../greatexpectations/task_example.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/cookbook/integrations/flytekit_plugins/greatexpectations/task_example.py b/cookbook/integrations/flytekit_plugins/greatexpectations/task_example.py index 6eb1e9d54d..82fe15c1f8 100644 --- a/cookbook/integrations/flytekit_plugins/greatexpectations/task_example.py +++ b/cookbook/integrations/flytekit_plugins/greatexpectations/task_example.py @@ -50,6 +50,10 @@ # Next, we define a task that validates the data before returning the shape of the DataFrame. @task(limits=Resources(mem="500Mi")) def simple_task(csv_file: str) -> int: + # GreatExpectationsTask returns Great Expectations' checkpoint result. + # You can print the result to know more about the data within it. + # If the data validation fails, this will return a ValidationError. + result = simple_task_object(dataset=csv_file) df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file)) return df.shape[0] @@ -58,11 +62,6 @@ def simple_task(csv_file: str) -> int: # Finally, we define a workflow. @workflow def simple_wf(dataset: str = DATASET_LOCAL) -> int: - - # GreatExpectationsTask returns Great Expectations' checkpoint result. - # You can print the result to know more about the data within it. - # If the data validation fails, this will return a ValidationError. - result = simple_task_object(dataset=dataset) return simple_task(csv_file=dataset) @@ -92,6 +91,7 @@ def simple_wf(dataset: str = DATASET_LOCAL) -> int: def file_task( dataset: CSVFile, ) -> int: + file_task_object(dataset=dataset) return len(pd.read_csv(dataset)) @@ -101,7 +101,6 @@ def file_task( def file_wf( dataset: CSVFile = DATASET_REMOTE, ) -> int: - file_task_object(dataset=dataset) return file_task(dataset=dataset) @@ -134,6 +133,7 @@ def file_wf( # Next, we define a task that validates the data and returns the columns in it. @task(limits=Resources(mem="500Mi")) def schema_task(dataset: pd.DataFrame) -> typing.List[str]: + schema_task_object(dataset=dataset) return list(dataset.columns) @@ -142,7 +142,6 @@ def schema_task(dataset: pd.DataFrame) -> typing.List[str]: @workflow def schema_wf() -> typing.List[str]: df = sql_to_df() - schema_task_object(dataset=df) return schema_task(dataset=df)