Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate "Types" tutorial doc to CRAG #5246

Merged
merged 2 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 32 additions & 34 deletions docs/content-crag/tutorial/advanced-tutorial/types.mdx
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
---
title: "Advanced: Dagster Types | Dagster"
description: Besides Python 3's typing system, Dagster provides a type system that helps users describe what kind of values their solids accept and produce.
description: Besides Python 3's typing system, Dagster provides a type system that helps users describe what kind of values their ops accept and produce.
---

# Advanced: Dagster Types

<CodeReferenceLink filePath="examples/docs_snippets/docs_snippets/intro_tutorial/basics/e04_quality/" />
<CodeReferenceLink filePath="examples/docs_snippets_crag/docs_snippets_crag/intro_tutorial/basics/e04_quality/" />

## Verifying Solid Outputs and Inputs
## Verifying Op Outputs and Inputs

Dagster lets developers express what they expect their solid inputs and outputs to look like through [Dagster Types](/\_apidocs/types).
Dagster lets developers express what they expect their op inputs and outputs to look like through [Dagster Types](/\_apidocs/types).

The dagster type system is gradual and optional - pipelines can run without types specified explicitly, and specifying types in some places doesn't require that types be specified everywhere.
The dagster type system is gradual and optional - jobs can run without types specified explicitly, and specifying types in some places doesn't require that types be specified everywhere.

Dagster type-checking happens at solid execution time - each type defines a `type_check_fn` that knows how to check whether values match what it expects.
Dagster type-checking happens at op execution time - each type defines a `type_check_fn` that knows how to check whether values match what it expects.

- When a type is specified for a solid's input, then the type check occurs immediately before the solid is executed.
- When a type is specified for a solid's output, then the type check occurs immediately after the solid is executed.
- When a type is specified for an op's input, then the type check occurs immediately before the op is executed.
- When a type is specified for an op's output, then the type check occurs immediately after the op is executed.

Let's look back at our simple `download_csv` solid.
Let's look back at our simple `download_csv` op.

```python file=/intro_tutorial/basics/e04_quality/inputs_typed.py startafter=start_inputs_typed_marker_0 endbefore=end_inputs_typed_marker_0
@solid
@op
def download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
Expand Down Expand Up @@ -49,7 +49,7 @@ The `lines` object returned by Python's built-in `csv.DictReader` is a list of `
]
```

This is a simple representation of a "data frame", or a table of data. We'd like to be able to use Dagster's type system to type the output of `download_csv`, so that we can do type checking when we construct the pipeline, ensuring that any solid consuming the output of `download_csv` expects to receive data in this format.
This is a simple representation of a "data frame", or a table of data. We'd like to be able to use Dagster's type system to type the output of `download_csv`, so that we can do type checking when we construct the job, ensuring that any op consuming the output of `download_csv` expects to receive data in this format.

### Constructing a Dagster Type

Expand All @@ -58,9 +58,7 @@ To do this, we'll construct a <PyObject module="dagster" object="DagsterType" di

```python file=/intro_tutorial/basics/e04_quality/custom_types.py startafter=start_custom_types_marker_0 endbefore=end_custom_types_marker_0
def is_list_of_dicts(_, value):
return isinstance(value, list) and all(
isinstance(element, dict) for element in value
)
return isinstance(value, list) and all(isinstance(element, dict) for element in value)


SimpleDataFrame = DagsterType(
Expand All @@ -70,34 +68,34 @@ SimpleDataFrame = DagsterType(
)
```

Now we can annotate the rest of our pipeline with our new type:
Now we can annotate the rest of our job with our new type:

```python file=/intro_tutorial/basics/e04_quality/custom_types.py startafter=start_custom_types_marker_1 endbefore=end_custom_types_marker_1
@solid(output_defs=[OutputDefinition(SimpleDataFrame)])
@op(out=Out(SimpleDataFrame))
def download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return [row for row in csv.DictReader(lines)]


@solid(input_defs=[InputDefinition("cereals", SimpleDataFrame)])
@op(ins={"cereals": In(SimpleDataFrame)})
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')
```

The type metadata now appears in Dagit and the system will ensure the input and output to this solid indeed match the criteria for `SimpleDataFrame`. As usual, run:
The type metadata now appears in Dagit and the system will ensure the input and output to this op indeed match the criteria for `SimpleDataFrame`. As usual, run:

```bash
dagit -f custom_types.py
```

<Image
alt="custom_types_figure_one.png"
src="/images/tutorial/custom_types_figure_one.png"
width={1680}
height={946}
alt="job_custom_types_figure_one.png"
src="/images/tutorial/job_custom_types_figure_one.png"
width={2756}
height={2098}
/>

You can see that the output of `download_csv` (which by default has the name `result`) is marked to be of type `SimpleDataFrame`.
Expand All @@ -106,32 +104,32 @@ You can see that the output of `download_csv` (which by default has the name `re

### When Type Checks Fail

Now, if our solid logic fails to return the right type, we'll see a type check failure, which will fail the pipeline. Let's replace our `download_csv` solid with the following bad logic:
Now, if our op logic fails to return the right type, we'll see a type check failure, which will fail the job. Let's replace our `download_csv` op with the following bad logic:

```python file=/intro_tutorial/basics/e04_quality/custom_types_2.py startafter=start_custom_types_2_marker_1 endbefore=end_custom_types_2_marker_1
@solid(output_defs=[OutputDefinition(SimpleDataFrame)])
@op(out=Out(SimpleDataFrame))
def bad_download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return ["not_a_dict"]
```

When we run the pipeline with this solid, we'll see an error in your terminal like:
When we run the job with this op, we'll see an error in your terminal like:

```bash
2021-02-05 11:31:46 - dagster - ERROR - custom_type_pipeline - 241c9208-6367-474f-8625-5b64fbf74568 - 25500 - bad_download_csv - STEP_FAILURE - Execution of step "bad_download_csv" failed.
2021-10-18 13:15:37 - dagster - ERROR - custom_type_job - 66d26360-84bc-41a3-8848-fba271354673 - 16200 - bad_download_csv - STEP_FAILURE - Execution of step "bad_download_csv" failed.

dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output "result" - expected type "SimpleDataFrame".
```

We will also see the error message in Dagit:

<Image
alt="custom_types_2_dagit_error_messasge.png"
src="/images/tutorial/custom_types_2_dagit_error_message.png"
width={2880}
height={1596}
alt="job_custom_types_2_dagit_error_message.png"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we keep the filenames the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, editing

src="/images/tutorial/job_custom_types_2_dagit_error_message.png"
width={2756}
height={2098}
/>

<br />
Expand Down Expand Up @@ -188,10 +186,10 @@ A <PyObject module="dagster" object="TypeCheck" displayText="TypeCheck" /> must
Dagit knows how to display and archive structured metadata of this kind for future review:

<Image
alt="custom_types_figure_two.png"
src="/images/tutorial/custom_types_figure_two.png"
width={1680}
height={946}
alt="job_custom_types_figure_two.png"
src="/images/tutorial/job_custom_types_figure_two.png"
width={2756}
height={2098}
/>

<br />
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
import csv

import requests
from dagster import (
DagsterType,
InputDefinition,
OutputDefinition,
pipeline,
solid,
)
from dagster import DagsterType, In, Out, job, op


# start_custom_types_marker_0
def is_list_of_dicts(_, value):
return isinstance(value, list) and all(
isinstance(element, dict) for element in value
)
return isinstance(value, list) and all(isinstance(element, dict) for element in value)


SimpleDataFrame = DagsterType(
Expand All @@ -28,15 +20,15 @@ def is_list_of_dicts(_, value):
# start_custom_types_marker_1


@solid(output_defs=[OutputDefinition(SimpleDataFrame)])
@op(out=Out(SimpleDataFrame))
def download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return [row for row in csv.DictReader(lines)]


@solid(input_defs=[InputDefinition("cereals", SimpleDataFrame)])
@op(ins={"cereals": In(SimpleDataFrame)})
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')
Expand All @@ -45,6 +37,6 @@ def sort_by_calories(context, cereals):
# end_custom_types_marker_1


@pipeline
def custom_type_pipeline():
@job
def custom_type_job():
sort_by_calories(download_csv())
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import requests
from dagster import (
DagsterType,
InputDefinition,
OutputDefinition,
pipeline,
solid,
)
from dagster import DagsterType, In, Out, job, op


# start_custom_types_2_marker_0
def is_list_of_dicts(_, value):
return isinstance(value, list) and all(
isinstance(element, dict) for element in value
)
return isinstance(value, list) and all(isinstance(element, dict) for element in value)


SimpleDataFrame = DagsterType(
Expand All @@ -23,7 +15,7 @@ def is_list_of_dicts(_, value):
# end_custom_types_2_marker_0

# start_custom_types_2_marker_1
@solid(output_defs=[OutputDefinition(SimpleDataFrame)])
@op(out=Out(SimpleDataFrame))
def bad_download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
Expand All @@ -34,12 +26,12 @@ def bad_download_csv(context):
# end_custom_types_2_marker_1


@solid(input_defs=[InputDefinition("cereals", SimpleDataFrame)])
@op(ins={"cereals": In(SimpleDataFrame)})
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')


@pipeline
def custom_type_pipeline():
@job
def custom_type_job():
sort_by_calories(bad_download_csv())
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
String,
check_dagster_type,
dagster_type_loader,
execute_pipeline,
pipeline,
solid,
job,
op,
)


Expand All @@ -29,9 +28,7 @@ def less_simple_data_frame_type_check(_, value):
if not isinstance(row, dict):
raise Failure(
"LessSimpleDataFrame should be a list of dicts, "
"got {type_} for row {idx}".format(
type_=type(row), idx=(i + 1)
)
"got {type_} for row {idx}".format(type_=type(row), idx=(i + 1))
)
row_fields = [field for field in row.keys()]
if fields != row_fields:
Expand Down Expand Up @@ -65,48 +62,35 @@ def less_simple_data_frame_loader(context, config):
)


@solid
@op
def sort_by_calories(context, cereals: LessSimpleDataFrame):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(
"Least caloric cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
"Least caloric cereal: {least_caloric}".format(least_caloric=sorted_cereals[0]["name"])
)
context.log.info(
"Most caloric cereal: {most_caloric}".format(
most_caloric=sorted_cereals[-1]["name"]
)
"Most caloric cereal: {most_caloric}".format(most_caloric=sorted_cereals[-1]["name"])
)


@pipeline
def custom_type_pipeline():
@job
def custom_type_job():
sort_by_calories()


if __name__ == "__main__":
execute_pipeline(
custom_type_pipeline,
{
"solids": {
"sort_by_calories": {
"inputs": {"cereals": {"csv_path": "cereal.csv"}}
}
}
custom_type_job.execute_in_process(
run_config={
"ops": {"sort_by_calories": {"inputs": {"cereals": {"csv_path": "cereal.csv"}}}}
},
)


# start_custom_types_test_marker_0
def test_less_simple_data_frame():
assert check_dagster_type(
LessSimpleDataFrame, [{"foo": 1}, {"foo": 2}]
).success
assert check_dagster_type(LessSimpleDataFrame, [{"foo": 1}, {"foo": 2}]).success

type_check = check_dagster_type(
LessSimpleDataFrame, [{"foo": 1}, {"bar": 2}]
)
type_check = check_dagster_type(LessSimpleDataFrame, [{"foo": 1}, {"bar": 2}])
assert not type_check.success
assert type_check.description == (
"Rows in LessSimpleDataFrame should have the same fields, "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import csv

import requests
from dagster import execute_pipeline, pipeline, solid
from dagster import job, op


# start_inputs_typed_marker_0
@solid
@op
def download_csv(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
Expand All @@ -16,26 +16,22 @@ def download_csv(context):
# end_inputs_typed_marker_0


@solid
@op
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(
"Least caloric cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
"Least caloric cereal: {least_caloric}".format(least_caloric=sorted_cereals[0]["name"])
)
context.log.info(
"Most caloric cereal: {most_caloric}".format(
most_caloric=sorted_cereals[-1]["name"]
)
"Most caloric cereal: {most_caloric}".format(most_caloric=sorted_cereals[-1]["name"])
)


@pipeline
def inputs_pipeline():
@job
def inputs_job():
sort_by_calories(download_csv())


if __name__ == "__main__":
result = execute_pipeline(inputs_pipeline)
result = inputs_job.execute_in_process()
assert result.success
Loading