diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py index 175503943ef05..2a49d7c1763a0 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py @@ -17,6 +17,7 @@ cast, ) +from pydantic import Field from typing_extensions import Self, TypeAlias, TypeVar import dagster._check as check @@ -614,129 +615,124 @@ def null() -> "NullMetadataValue": @whitelist_for_serdes(storage_name="TextMetadataEntryData") -class TextMetadataValue( - NamedTuple( - "_TextMetadataValue", - [ - ("text", PublicAttr[Optional[str]]), - ], - ), - MetadataValue[str], -): +class TextMetadataValue(DagsterModel, MetadataValue[str]): """Container class for text metadata entry data. Args: text (Optional[str]): The text data. """ - def __new__(cls, text: Optional[str]): - return super(TextMetadataValue, cls).__new__( - cls, check.opt_str_param(text, "text", default="") - ) + text_inner: Optional[str] = Field(..., alias="text") + + def __init__(self, text: Optional[str]): + super().__init__(text=text or "") + + @public + @property + def text(self) -> Optional[str]: + return self.text_inner @public @property def value(self) -> Optional[str]: """Optional[str]: The wrapped text data.""" - return self.text + return self.text_inner @whitelist_for_serdes(storage_name="UrlMetadataEntryData") -class UrlMetadataValue( - NamedTuple( - "_UrlMetadataValue", - [ - ("url", PublicAttr[Optional[str]]), - ], - ), - MetadataValue[str], -): +class UrlMetadataValue(DagsterModel, MetadataValue[str]): """Container class for URL metadata entry data. Args: url (Optional[str]): The URL as a string. """ - def __new__(cls, url: Optional[str]): - return super(UrlMetadataValue, cls).__new__( - cls, check.opt_str_param(url, "url", default="") - ) + url_inner: Optional[str] = Field(..., alias="url") + + def __init__(self, url: Optional[str]): + super().__init__(url=url or "") @public @property def value(self) -> Optional[str]: """Optional[str]: The wrapped URL.""" - return self.url + return self.url_inner + + @public + @property + def url(self) -> Optional[str]: + """Optional[str]: The wrapped URL.""" + return self.url_inner @whitelist_for_serdes(storage_name="PathMetadataEntryData") -class PathMetadataValue( - NamedTuple("_PathMetadataValue", [("path", PublicAttr[Optional[str]])]), MetadataValue[str] -): +class PathMetadataValue(DagsterModel, MetadataValue[str]): """Container class for path metadata entry data. Args: path (Optional[str]): The path as a string or conforming to os.PathLike. """ - def __new__(cls, path: Optional[Union[str, os.PathLike]]): - return super(PathMetadataValue, cls).__new__( - cls, check.opt_path_param(path, "path", default="") - ) + path_inner: Optional[str] = Field(..., alias="path") + + def __init__(self, path: Optional[Union[str, os.PathLike]]): + super().__init__(path=check.opt_path_param(path, "path", default="")) + + @public + @property + def path(self) -> Optional[str]: + return self.path_inner @public @property def value(self) -> Optional[str]: """Optional[str]: The wrapped path.""" - return self.path + return self.path_inner @whitelist_for_serdes(storage_name="NotebookMetadataEntryData") -class NotebookMetadataValue( - NamedTuple("_NotebookMetadataValue", [("path", PublicAttr[Optional[str]])]), MetadataValue[str] -): +class NotebookMetadataValue(DagsterModel, MetadataValue[str]): """Container class for notebook metadata entry data. Args: path (Optional[str]): The path to the notebook as a string or conforming to os.PathLike. """ - def __new__(cls, path: Optional[Union[str, os.PathLike]]): - return super(NotebookMetadataValue, cls).__new__( - cls, check.opt_path_param(path, "path", default="") - ) + path_inner: Optional[str] = Field(..., alias="path") + + def __init__(self, path: Optional[Union[str, os.PathLike]]): + super().__init__(path=check.opt_path_param(path, "path", default="")) + + @public + @property + def path(self) -> Optional[str]: + return self.path_inner @public @property def value(self) -> Optional[str]: """Optional[str]: The wrapped path to the notebook as a string.""" - return self.path + return self.path_inner @whitelist_for_serdes(storage_name="JsonMetadataEntryData") -class JsonMetadataValue( - NamedTuple( - "_JsonMetadataValue", - [ - ("data", PublicAttr[Optional[Union[Sequence[Any], Mapping[str, Any]]]]), - ], - ), - MetadataValue[Union[Sequence[Any], Mapping[str, Any]]], -): +class JsonMetadataValue(DagsterModel, MetadataValue[Union[Sequence[Any], Mapping[str, Any]]]): """Container class for JSON metadata entry data. Args: data (Union[Sequence[Any], Dict[str, Any]]): The JSON data. """ - def __new__(cls, data: Optional[Union[Sequence[Any], Mapping[str, Any]]]): + data: PublicAttr[Optional[Union[Sequence[Any], Mapping[str, Any]]]] + + def __init__(self, data: Optional[Union[Sequence[Any], Mapping[str, Any]]]): data = check.opt_inst_param(data, "data", (Sequence, Mapping)) try: # check that the value is JSON serializable seven.dumps(data) except TypeError: raise DagsterInvalidMetadata("Value is not JSON serializable.") - return super(JsonMetadataValue, cls).__new__(cls, data) + super().__init__(data=data) @public @property @@ -746,25 +742,17 @@ def value(self) -> Optional[Union[Sequence[Any], Mapping[str, Any]]]: @whitelist_for_serdes(storage_name="MarkdownMetadataEntryData") -class MarkdownMetadataValue( - NamedTuple( - "_MarkdownMetadataValue", - [ - ("md_str", PublicAttr[Optional[str]]), - ], - ), - MetadataValue[str], -): +class MarkdownMetadataValue(DagsterModel, MetadataValue[str]): """Container class for markdown metadata entry data. Args: md_str (Optional[str]): The markdown as a string. """ - def __new__(cls, md_str: Optional[str]): - return super(MarkdownMetadataValue, cls).__new__( - cls, check.opt_str_param(md_str, "md_str", default="") - ) + md_str: PublicAttr[Optional[str]] + + def __init__(self, md_str: Optional[str]): + super().__init__(md_str=md_str or "") @public @property @@ -775,16 +763,7 @@ def value(self) -> Optional[str]: # This should be deprecated or fixed so that `value` does not return itself. @whitelist_for_serdes(storage_name="PythonArtifactMetadataEntryData") -class PythonArtifactMetadataValue( - NamedTuple( - "_PythonArtifactMetadataValue", - [ - ("module", PublicAttr[str]), - ("name", PublicAttr[str]), - ], - ), - MetadataValue["PythonArtifactMetadataValue"], -): +class PythonArtifactMetadataValue(DagsterModel, MetadataValue["PythonArtifactMetadataValue"]): """Container class for python artifact metadata entry data. Args: @@ -792,10 +771,11 @@ class PythonArtifactMetadataValue( name (str): The name of the python artifact """ - def __new__(cls, module: str, name: str): - return super(PythonArtifactMetadataValue, cls).__new__( - cls, check.str_param(module, "module"), check.str_param(name, "name") - ) + module: PublicAttr[str] + name: PublicAttr[str] + + def __init__(self, module: str, name: str): + super().__init__(module=module, name=name) @public @property @@ -805,96 +785,93 @@ def value(self) -> Self: @whitelist_for_serdes(storage_name="FloatMetadataEntryData") -class FloatMetadataValue( - NamedTuple( - "_FloatMetadataValue", - [ - ("value", PublicAttr[Optional[float]]), - ], - ), - MetadataValue[float], -): +class FloatMetadataValue(DagsterModel, MetadataValue[float]): """Container class for float metadata entry data. Args: value (Optional[float]): The float value. """ - def __new__(cls, value: Optional[float]): - return super(FloatMetadataValue, cls).__new__(cls, check.opt_float_param(value, "value")) + value_inner: Optional[float] = Field(..., alias="value") + + def __init__(self, value: Optional[float]): + super().__init__(value=value) + + @public + @property + def value(self) -> Optional[float]: + return self.value_inner @whitelist_for_serdes(storage_name="IntMetadataEntryData") -class IntMetadataValue( - NamedTuple( - "_IntMetadataValue", - [ - ("value", PublicAttr[Optional[int]]), - ], - ), - MetadataValue[int], -): +class IntMetadataValue(DagsterModel, MetadataValue[int]): """Container class for int metadata entry data. Args: value (Optional[int]): The int value. """ - def __new__(cls, value: Optional[int]): - return super(IntMetadataValue, cls).__new__(cls, check.opt_int_param(value, "value")) + value_inner: Optional[int] = Field(..., alias="value") + + def __init__(self, value: Optional[int]): + super().__init__(value=value) + + @public + @property + def value(self) -> Optional[int]: + return self.value_inner @whitelist_for_serdes(storage_name="BoolMetadataEntryData") -class BoolMetadataValue( - NamedTuple("_BoolMetadataValue", [("value", PublicAttr[Optional[bool]])]), - MetadataValue[bool], -): +class BoolMetadataValue(DagsterModel, MetadataValue[bool]): """Container class for bool metadata entry data. Args: value (Optional[bool]): The bool value. """ - def __new__(cls, value: Optional[bool]): - return super(BoolMetadataValue, cls).__new__(cls, check.opt_bool_param(value, "value")) + value_inner: Optional[bool] = Field(..., alias="value") + + def __init__(self, value: Optional[bool]): + super().__init__(value=value) + + @public + @property + def value(self) -> Optional[bool]: + return self.value_inner @whitelist_for_serdes -class TimestampMetadataValue( - NamedTuple( - "_DateTimeMetadataValue", - [("value", PublicAttr[float])], - ), - MetadataValue[float], -): +class TimestampMetadataValue(DagsterModel, MetadataValue[float]): """Container class for metadata value that's a unix timestamp. Args: value (float): Seconds since the unix epoch. """ - def __new__(cls, value: float): - return super(TimestampMetadataValue, cls).__new__(cls, check.float_param(value, "value")) + value_inner: Optional[float] = Field(..., alias="value") + + def __init__(self, value: float): + super().__init__(value=value) + + @public + @property + def value(self) -> Optional[float]: + return self.value_inner @whitelist_for_serdes(storage_name="DagsterPipelineRunMetadataEntryData") -class DagsterRunMetadataValue( - NamedTuple( - "_DagsterRunMetadataValue", - [ - ("run_id", PublicAttr[str]), - ], - ), - MetadataValue[str], -): +class DagsterRunMetadataValue(DagsterModel, MetadataValue[str]): """Representation of a dagster run. Args: run_id (str): The run id """ - def __new__(cls, run_id: str): - return super(DagsterRunMetadataValue, cls).__new__(cls, check.str_param(run_id, "run_id")) + run_id: PublicAttr[str] + + def __init__(self, run_id: str): + super().__init__(run_id=run_id) @public @property @@ -904,17 +881,7 @@ def value(self) -> str: @whitelist_for_serdes -class DagsterJobMetadataValue( - NamedTuple( - "_DagsterJobMetadataValue", - [ - ("job_name", PublicAttr[str]), - ("location_name", PublicAttr[str]), - ("repository_name", PublicAttr[Optional[str]]), - ], - ), - MetadataValue["DagsterJobMetadataValue"], -): +class DagsterJobMetadataValue(DagsterModel, MetadataValue["DagsterJobMetadataValue"]): """Representation of a dagster run. Args: @@ -924,17 +891,20 @@ class DagsterJobMetadataValue( assumed to be in the same repository as this object. """ - def __new__( - cls, + job_name: PublicAttr[str] + location_name: PublicAttr[str] + repository_name: PublicAttr[Optional[str]] + + def __init__( + self, job_name: str, location_name: str, repository_name: Optional[str] = None, ): - return super(DagsterJobMetadataValue, cls).__new__( - cls, - check.str_param(job_name, "job_name"), - check.str_param(location_name, "location_name"), - check.opt_str_param(repository_name, "repository_name"), + super().__init__( + job_name=job_name, + location_name=location_name, + repository_name=repository_name, ) @public @@ -944,22 +914,17 @@ def value(self) -> Self: @whitelist_for_serdes(storage_name="DagsterAssetMetadataEntryData") -class DagsterAssetMetadataValue( - NamedTuple("_DagsterAssetMetadataValue", [("asset_key", PublicAttr["AssetKey"])]), - MetadataValue["AssetKey"], -): +class DagsterAssetMetadataValue(DagsterModel, MetadataValue[AssetKey]): """Representation of a dagster asset. Args: asset_key (AssetKey): The dagster asset key """ - def __new__(cls, asset_key: "AssetKey"): - from dagster._core.definitions.events import AssetKey + asset_key: PublicAttr[AssetKey] - return super(DagsterAssetMetadataValue, cls).__new__( - cls, check.inst_param(asset_key, "asset_key", AssetKey) - ) + def __init__(self, asset_key: AssetKey): + super().__init__(asset_key=asset_key) @public @property @@ -971,16 +936,7 @@ def value(self) -> "AssetKey": # This should be deprecated or fixed so that `value` does not return itself. @experimental @whitelist_for_serdes(storage_name="TableMetadataEntryData") -class TableMetadataValue( - NamedTuple( - "_TableMetadataValue", - [ - ("records", PublicAttr[Sequence[TableRecord]]), - ("schema", PublicAttr[TableSchema]), - ], - ), - MetadataValue["TableMetadataValue"], -): +class TableMetadataValue(DagsterModel, MetadataValue["TableMetadataValue"]): """Container class for table metadata entry data. Args: @@ -1001,6 +957,9 @@ class TableMetadataValue( ) """ + records: PublicAttr[Sequence[TableRecord]] + schema_inner: TableSchema = Field(..., alias="schema") + @public @staticmethod def infer_column_type(value: object) -> str: @@ -1014,7 +973,7 @@ def infer_column_type(value: object) -> str: else: return "string" - def __new__(cls, records: Sequence[TableRecord], schema: Optional[TableSchema]): + def __init__(self, records: Sequence[TableRecord], schema: Optional[TableSchema] = None): check.sequence_param(records, "records", of_type=TableRecord) check.opt_inst_param(schema, "schema", TableSchema) @@ -1033,11 +992,12 @@ def __new__(cls, records: Sequence[TableRecord], schema: Optional[TableSchema]): ] ) - return super(TableMetadataValue, cls).__new__( - cls, - records, - schema, - ) + super().__init__(records=records, schema=schema) + + @public + @property + def schema(self) -> TableSchema: + return self.schema_inner @public @property @@ -1047,35 +1007,32 @@ def value(self) -> Self: @whitelist_for_serdes(storage_name="TableSchemaMetadataEntryData") -class TableSchemaMetadataValue( - NamedTuple("_TableSchemaMetadataValue", [("schema", PublicAttr[TableSchema])]), - MetadataValue[TableSchema], -): +class TableSchemaMetadataValue(DagsterModel, MetadataValue[TableSchema]): """Representation of a schema for arbitrary tabular data. Args: schema (TableSchema): The dictionary containing the schema representation. """ - def __new__(cls, schema: TableSchema): - return super(TableSchemaMetadataValue, cls).__new__( - cls, check.inst_param(schema, "schema", TableSchema) - ) + schema_inner: TableSchema = Field(..., alias="schema") + + def __init__(self, schema: TableSchema): + super().__init__(schema=schema) @public @property def value(self) -> TableSchema: """TableSchema: The wrapped :py:class:`TableSchema`.""" - return self.schema + return self.schema_inner + + @public + @property + def schema(self) -> TableSchema: + return self.schema_inner @whitelist_for_serdes -class TableColumnLineageMetadataValue( - NamedTuple( - "_TableColumnLineageMetadataValue", [("column_lineage", PublicAttr[TableColumnLineage])] - ), - MetadataValue[TableColumnLineage], -): +class TableColumnLineageMetadataValue(DagsterModel, MetadataValue[TableColumnLineage]): """Representation of the lineage of column inputs to column outputs of arbitrary tabular data. Args: @@ -1083,20 +1040,20 @@ class TableColumnLineageMetadataValue( for the table. """ - def __new__(cls, column_lineage: TableColumnLineage): - return super(TableColumnLineageMetadataValue, cls).__new__( - cls, check.inst_param(column_lineage, "column_lineage", TableColumnLineage) - ) + column_lineage_inner: TableColumnLineage = Field(..., alias="column_lineage") + + def __init__(self, column_lineage: TableColumnLineage): + super().__init__(column_lineage=column_lineage) @public @property def value(self) -> TableColumnLineage: """TableSpec: The wrapped :py:class:`TableSpec`.""" - return self.column_lineage + return self.column_lineage_inner @whitelist_for_serdes(storage_name="NullMetadataEntryData") -class NullMetadataValue(NamedTuple("_NullMetadataValue", []), MetadataValue[None]): +class NullMetadataValue(DagsterModel, MetadataValue[None]): """Representation of null.""" @public diff --git a/python_modules/dagster/dagster_tests/core_tests/test_metadata.py b/python_modules/dagster/dagster_tests/core_tests/test_metadata.py index 1f30f2a37ef15..c96b2a28e6460 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_metadata.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_metadata.py @@ -1,4 +1,21 @@ -from dagster import GraphDefinition, NodeInvocation, op +from dagster import ( + AssetKey, + GraphDefinition, + IntMetadataValue, + JsonMetadataValue, + MetadataValue, + NodeInvocation, + TableColumn, + TableColumnDep, + TableColumnLineage, + TableMetadataValue, + TableRecord, + TableSchema, + TableSchemaMetadataValue, + UrlMetadataValue, + op, +) +from dagster._serdes.serdes import deserialize_value, serialize_value def test_op_instance_tags(): @@ -25,3 +42,66 @@ def metadata_op(context): assert result.success assert called["yup"] + + +def test_table_schema_from_name_type_dict(): + assert TableSchema.from_name_type_dict({"foo": "customtype", "bar": "string"}) == TableSchema( + columns=[ + TableColumn(name="foo", type="customtype"), + TableColumn(name="bar", type="string"), + ], + ) + + +def test_table_serialization(): + table_metadata = MetadataValue.table( + records=[ + TableRecord(dict(foo=1, bar=2)), + ], + ) + serialized = serialize_value(table_metadata) + assert deserialize_value(serialized, TableMetadataValue) == table_metadata + + +def test_metadata_value_column_lineage() -> None: + expected_column_lineage = TableColumnLineage( + {"foo": [TableColumnDep(asset_key=AssetKey("bar"), column_name="baz")]} + ) + column_lineage_metadata_value = MetadataValue.column_lineage(expected_column_lineage) + + assert column_lineage_metadata_value.value == expected_column_lineage + + +def test_int_metadata_value(): + assert IntMetadataValue(5).value == 5 + assert IntMetadataValue(value=5).value == 5 + + +def test_url_metadata_value(): + url = "http://dagster.io" + assert UrlMetadataValue(url).value == url + assert UrlMetadataValue(url).url == url + assert UrlMetadataValue(url=url).value == url + + +def test_table_metadata_value(): + records = [TableRecord(dict(foo=1, bar=2))] + schema = TableSchema( + columns=[TableColumn(name="foo", type="int"), TableColumn(name="bar", type="int")] + ) + metadata_val = TableMetadataValue(records, schema=schema) + + assert metadata_val.records == records + assert metadata_val.schema == schema + + +def test_table_schema_metadata_value(): + schema = TableSchema( + columns=[TableColumn(name="foo", type="int"), TableColumn(name="bar", type="int")] + ) + assert TableSchemaMetadataValue(schema).schema == schema + + +def test_json_metadata_value(): + assert JsonMetadataValue({"a": "b"}).data == {"a": "b"} + assert JsonMetadataValue({"a": "b"}).value == {"a": "b"} diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py b/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py index 4bc99b6c46669..9c5b51b774960 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py @@ -29,7 +29,6 @@ from dagster._core.definitions.metadata import ( DagsterInvalidMetadata, TableColumnLineageMetadataValue, - TableMetadataValue, normalize_metadata, ) from dagster._core.definitions.metadata.table import ( @@ -43,7 +42,6 @@ ) from dagster._core.execution.execution_result import ExecutionResult from dagster._core.snap.node import build_node_defs_snapshot -from dagster._serdes.serdes import deserialize_value, serialize_value def step_events_of_type(result: ExecutionResult, node_name: str, event_type: DagsterEventType): @@ -370,34 +368,6 @@ def test_complex_table_schema(): ) -def test_table_schema_from_name_type_dict(): - assert TableSchema.from_name_type_dict({"foo": "customtype", "bar": "string"}) == TableSchema( - columns=[ - TableColumn(name="foo", type="customtype"), - TableColumn(name="bar", type="string"), - ], - ) - - -def test_table_serialization(): - table_metadata = MetadataValue.table( - records=[ - TableRecord(dict(foo=1, bar=2)), - ], - ) - serialized = serialize_value(table_metadata) - assert deserialize_value(serialized, TableMetadataValue) == table_metadata - - -def test_metadata_value_column_lineage() -> None: - expected_column_lineage = TableColumnLineage( - {"foo": [TableColumnDep(asset_key=AssetKey("bar"), column_name="baz")]} - ) - column_lineage_metadata_value = MetadataValue.column_lineage(expected_column_lineage) - - assert column_lineage_metadata_value.value == expected_column_lineage - - def test_bool_metadata_value(): @op(out={}) def the_op():