diff --git a/dagshub/data_engine/client/data_client.py b/dagshub/data_engine/client/data_client.py index 1c6ff87a..b6a92525 100644 --- a/dagshub/data_engine/client/data_client.py +++ b/dagshub/data_engine/client/data_client.py @@ -155,7 +155,6 @@ def update_metadata(self, datasource: Datasource, entries: List[DatapointMetadat params = GqlMutations.update_metadata_params( datasource_id=datasource.source.id, datapoints=[e.to_dict() for e in entries] ) - return self._exec(q, params) def update_metadata_fields(self, datasource: Datasource, metadata_field_props: List[MetadataFieldSchema]): diff --git a/dagshub/data_engine/datasets.py b/dagshub/data_engine/datasets.py index 44126588..acd5e70c 100644 --- a/dagshub/data_engine/datasets.py +++ b/dagshub/data_engine/datasets.py @@ -7,7 +7,6 @@ from dagshub.data_engine.model.datasource import Datasource from dagshub.data_engine.model.datasource_state import DatasourceState from dagshub.data_engine.model.errors import DatasetNotFoundError -from dagshub.data_engine.model.query import DatasourceQuery def get_datasets(repo: str) -> List[Datasource]: @@ -41,7 +40,7 @@ def _from_gql_result(repo: str, dataset_result: "DatasetResult") -> "Datasource" ds = Datasource(DatasourceState.from_gql_result(repo, dataset_result.datasource)) query_dict = json.loads(dataset_result.datasetQuery) - if "query" in query_dict: - ds._query = DatasourceQuery.deserialize(query_dict["query"]) + + ds._deserialize_gql_result(query_dict) return ds diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 51d128e3..a35af506 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -1,4 +1,5 @@ import base64 +import datetime import gzip import json import logging @@ -61,15 +62,54 @@ class DatapointMetadataUpdateEntry(json.JSONEncoder): allowMultiple: bool = False +@dataclass +class Field: + column: str + as_of_time: Optional[Union[float, datetime.datetime]] = None + alias: Optional[str] = None + + @property + def as_of_timestamp(self) -> Optional[int]: + if self.as_of_time is not None: + if isinstance(self.as_of_time, datetime.datetime): + return int(self.as_of_time.timestamp()) + else: + return int(self.as_of_time) + else: + return None + + def to_dict(self, ds: "Datasource") -> Dict[str, Any]: + if not ds.has_field(self.column): + raise FieldNotFoundError(self.column) + + res_dict = {"name": self.column} + if self.as_of_time is not None: + res_dict["asOf"] = self.as_of_timestamp + if self.alias: + res_dict["alias"] = self.alias + return res_dict + + class Datasource: - def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None): + def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None, select=None, as_of=None): self._source = datasource if query is None: query = DatasourceQuery() self._query = query - + self._select = select or [] + self._global_as_of = as_of self.serialize_gql_query_input() + @property + def global_as_of_timestamp(self) -> Optional[int]: + if self._global_as_of is not None: + if isinstance(self._global_as_of, datetime.datetime): + return int(self._global_as_of.timestamp()) + else: + return int(self._global_as_of) + else: + return None + @property def source(self) -> "DatasourceState": return self._source @@ -82,7 +122,7 @@ def clear_query(self): self._query = DatasourceQuery() def __deepcopy__(self, memodict={}) -> "Datasource": - res = Datasource(self._source, self._query.__deepcopy__()) + res = Datasource(self._source, self._query.__deepcopy__(), self._select, self._global_as_of) return res def get_query(self): @@ -93,9 +133,20 @@ def annotation_fields(self) -> List[str]: return [f.name for f in self.fields if f.is_annotation()] def serialize_gql_query_input(self): - return { + result = { "query": self._query.serialize_graphql(), } + if self._select: + result["select"] = self._select + + if self.global_as_of_timestamp: + result["asOf"] = self.global_as_of_timestamp + return result + + def _deserialize_gql_result(self, query_dict): + if "query" in query_dict: + self._query = DatasourceQuery.deserialize(query_dict["query"]) + self._select = query_dict.get("select") def sample(self, start: Optional[int] = None, end: Optional[int] = None): if start is not None: @@ -120,6 +171,48 @@ def all(self) -> "QueryResult": self._check_preprocess() return self._source.client.get_datapoints(self) + def select(self, *selected: Union[str, Field]): + """ + using select() you can choose which columns will appear on the query result, + what their names will be (alias) and from what time. For example: + t = datetime.now(timezone.utc) - timedelta(hours=24) + q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode")) + """ + new_ds = self.__deepcopy__() + + include_all = False + for s in selected: + if isinstance(s, Field): + new_ds._select.append(s.to_dict(self)) + else: + if s != '*': + new_ds._select.append({"name": s}) + else: + include_all = True + + if include_all: + aliases = [s["alias"] for s in new_ds._select if "alias" in s] + for f in self.fields: + if f.name in aliases: + raise ValueError(f"alias {f.name} can't be used, a column with that name exists") + new_ds._select.append({"name": f.name}) + return new_ds + + def as_of(self, time: Union[float, datetime.datetime]): + """ + as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example: + + t = datetime.now(timezone.utc) - timedelta(hours=24) + q1 = (ds["size"] > 5).as_of(t) + + in the above example all datapoints whose creation time is no later than 't', + and that match the condition at 't' - are returned. + """ + new_ds = self.__deepcopy__() + + new_ds._global_as_of = time + return new_ds + def _check_preprocess(self): self.source.get_from_dagshub() if ( @@ -499,7 +592,7 @@ def __repr__(self): queried_ds = ds[ds["value"] == 5] """ - def __getitem__(self, other: Union[slice, str, "Datasource"]): + def __getitem__(self, other: Union[slice, str, "Datasource", "Field"]): # Slicing - get items from the slice if type(other) is slice: return self.sample(other.start, other.stop) @@ -515,6 +608,11 @@ def __getitem__(self, other: Union[slice, str, "Datasource"]): else: new_ds._query.compose("and", other_query) return new_ds + elif type(other) is Field: + if not self.has_field(other.column): + raise FieldNotFoundError(other.column) + new_ds._query = DatasourceQuery(other.column, other.as_of_timestamp) + return new_ds # "index" is a datasource with a query - return the datasource inside # Example: # ds = Dataset() diff --git a/dagshub/data_engine/model/query.py b/dagshub/data_engine/model/query.py index 6709950f..c9919e1e 100644 --- a/dagshub/data_engine/model/query.py +++ b/dagshub/data_engine/model/query.py @@ -54,14 +54,20 @@ class FieldFilterOperand(enum.Enum): class DatasourceQuery: - def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None): + def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None, as_of: Optional[int] = None): self._operand_tree: Tree = Tree() if type(column_or_query) is str: # If it's ds["column"] then the root node is just the column name, will be filled later - self._operand_tree.create_node(UNFILLED_NODE_TAG, data={"field": column_or_query}) + data = {"field": column_or_query} + if as_of: + data["as_of"] = int(as_of) + self._operand_tree.create_node(UNFILLED_NODE_TAG, data=data) elif column_or_query is not None: self._operand_tree.create_node(column_or_query) + if as_of: + self._as_of = as_of + def __repr__(self): if self.is_empty: return "Query: empty" @@ -142,6 +148,8 @@ def _serialize_node(node: Node, tree: Tree) -> Dict: raise WrongOperatorError(f"Operator {operand} is not supported") key = node.data["field"] value = node.data["value"] + as_of = node.data.get("as_of") + value_type = metadataTypeLookup[type(value)].value if type(value) is bytes: # TODO: this will need to probably be changed when we allow actual binary field comparisons @@ -153,7 +161,7 @@ def _serialize_node(node: Node, tree: Tree) -> Dict: f"Value type {value_type} is not supported for querying.\r\n" f"Supported types: {list(metadataTypeLookup.keys())}" ) - return { + res = { "filter": { "key": key, "value": str(value), @@ -161,6 +169,9 @@ def _serialize_node(node: Node, tree: Tree) -> Dict: "comparator": query_op.value, } } + if as_of: + res["filter"]["asOf"] = as_of + return res @staticmethod def deserialize(serialized_query: Dict) -> "DatasourceQuery": @@ -192,7 +203,8 @@ def _deserialize_node(node_dict: Dict, tree: Tree, parent_node=None) -> None: value_type = metadataTypeLookupReverse[val["valueType"]] converter = _metadataTypeCustomConverters.get(value_type, lambda x: value_type(x)) value = converter(val["value"]) - node = Node(tag=comparator, data={"field": key, "value": value}) + as_of = val.get("asOf") + node = Node(tag=comparator, data={"field": key, "value": value, "as_of": as_of}) tree.add_node(node, parent_node) elif op_type in ("and", "or"): main_node = Node(tag=op_type) diff --git a/docs/data_engine.md b/docs/data_engine.md index 0ae53d05..d4ff21a6 100644 --- a/docs/data_engine.md +++ b/docs/data_engine.md @@ -281,6 +281,59 @@ filtered_ds = ds[ds["episode"] > 5] filtered_ds2 = filtered_ds[filtered_ds["has_baby_yoda"] == True] ``` +### Versioning +#### Query filtering: +An extended syntax lets you query according to different versions of enrichments. For example: +```python +# size metadata is constantly changed and we want to address the one from 24h ago +t = datetime.now(timezone.utc) - timedelta(hours=24) + +q1 = ds[Field("size", as_of_time=t] > 5 +``` +in the above example all datapoints whose "size" column updated no later than 't' that match the condition '>5' are returned. + + +#### Query select: +Using select() you can choose which columns will appear on the query result, what their names will be (alias) and from what time. For example: + +```python +t = datetime.now(timezone.utc) - timedelta(hours=24) + +q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode")) +``` +in the above example the result set of datapoints will have 2 columns of metadata: "size_asof_24h_ago" and "episode". +all other metadata columns are ommited.if the desired result is to get all metadata columns and in addition the selected list, +add "*" to the list, example: +```python +q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), "*") +``` + +#### Global as_of time: +Using as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example: + +```python +t = datetime.now(timezone.utc) - timedelta(hours=24) + +q1 = (ds["size"] > 5).as_of(t) +``` +in the above example all datapoints whose creation time is no later than 't' and that match the condition at 't' - are returned. + + +#### Notes and limitations: + +##### Time parameter: +- the time parameter can be POSIX timestamp or datetime object +- pay attention to timezones - use timestamp if known, or relative datetime if known (as in the above examples). if you use a specific date such as `dateutil.parser.parse("Tue 28 Nov 11:29 +2:00")` specify the utc delta as shown here, otherwise this date can translate to different timestamps in the machine that runs the client and in dagshub backend. +##### Select list: +- both "x" and `Field("x")` can be used +- alias, as_of_time - are optional +- we currently do not check the list for contradictions/overwrites/duplications, i.e `select(Field("x", as_of_time=t1), Field("x", as_of_time=t2))` does not make sense since there is no alias to differentiate, the result will not reflect the intention. also `select("x","x")` +##### Global as_of behavior: +- it applies to all entities unless otherwise specified, i.e if we use Field("x", as_of_time=t1)) then t1 will precede over a t2 specified in .as_of(t2). the sensibility of the results is up to the caller. you could get datapoints that existed in t1 < t2 based on a condition applied on their enrichments in t2. + + + + ## Saving queries You can save the query you have on the datasource. diff --git a/tests/data_engine/test_querying.py b/tests/data_engine/test_querying.py index da57c813..4afbaadf 100644 --- a/tests/data_engine/test_querying.py +++ b/tests/data_engine/test_querying.py @@ -1,7 +1,8 @@ +import dateutil.parser import pytest from dagshub.data_engine.model.datasource import ( - Datasource, + Datasource, Field, ) from dagshub.data_engine.model.errors import WrongOrderError, DatasetFieldComparisonError, FieldNotFoundError from dagshub.data_engine.model.query import DatasourceQuery, bytes_deserializer @@ -27,6 +28,84 @@ def test_simple_filter(ds): assert q.to_dict() == expected +def test_versioning_query_datetime(ds): + add_int_fields(ds, "x") + # datetime + ds2 = ds[ds[Field("x", as_of_time=dateutil.parser.parse("Wed 22 Nov 2023"))] > 1] + q = ds2.get_query() + assert q.to_dict() == {'gt': {'data': {'as_of': int(dateutil.parser.parse("Wed 22 Nov 2023").timestamp()), + 'field': 'x', 'value': 1}}} + + +def test_versioning_query_timestamp(ds): + add_int_fields(ds, "x") + # timestamp + ds2 = ds[ds[Field("x", as_of_time=1700604000)] > 1] + q = ds2.get_query() + assert q.to_dict() == {'gt': {'data': {'as_of': 1700604000, 'field': 'x', 'value': 1}}} + + +def test_versioning_select(ds): + add_int_fields(ds, "x") + add_int_fields(ds, "y") + add_int_fields(ds, "z") + + # test select + ds2 = ((ds[ds[Field("x", as_of_time=123.99)] > 1]) & + (ds[ds[Field("x", as_of_time=345)] > 2]) | + (ds[ds[Field("y", as_of_time=789)] > 3])). \ + select(Field("y", as_of_time=123), Field("x", as_of_time=456, alias="y_t1")) + q = ds2.get_query() + expected = {'or': {'children': [{'and': {'children': [{'gt': {'data': {'field': 'x', 'as_of': 123, 'value': 1}}}, + {'gt': {'data': {'field': 'x', 'as_of': 345, 'value': 2}}}], + 'data': None}}, + {'gt': {'data': {'field': 'y', 'as_of': 789, 'value': 3}}}], 'data': None}} + assert q.to_dict() == expected + + # test serialization works and includes select + expected_serialized = {'query': {'or': [{'and': [ + {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN', 'asOf': 123}}, + {'filter': {'key': 'x', 'value': '2', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN', 'asOf': 345}}]}, { + 'filter': {'key': 'y', 'value': '3', 'valueType': 'INTEGER', + 'comparator': 'GREATER_THAN', 'asOf': 789}}]}, + 'select': [{'name': 'y', 'asOf': 123}, {'name': 'x', 'asOf': 456, 'alias': 'y_t1'}]} + assert ds2.serialize_gql_query_input() == expected_serialized + + +def test_versioning_select_as_strings(ds): + add_int_fields(ds, "x") + add_int_fields(ds, "y") + add_int_fields(ds, "z") + + ds2 = (ds[ds["x"] > 1]).select("y", "z") + print(ds2.serialize_gql_query_input()) + assert ds2.serialize_gql_query_input() == { + 'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}}, + 'select': [{'name': 'y'}, {'name': 'z'}]} + + ds2 = (ds[ds["x"] > 1]).select("y", Field("x"), "z") + print(ds2.serialize_gql_query_input()) + assert ds2.serialize_gql_query_input() == { + 'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}}, + 'select': [{'name': 'y'}, {'name': 'x'}, {'name': 'z'}]} + + ds2 = (ds[ds["x"] > 1]).select("y", Field("x", as_of_time=1234), "z") + print(ds2.serialize_gql_query_input()) + assert ds2.serialize_gql_query_input() == { + 'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}}, + 'select': [{'name': 'y'}, {'name': 'x', 'asOf': 1234}, {'name': 'z'}]} + + +def test_versioning_dataset_deserialize(ds): + # test de-serialization works and includes select + query = {'select': [{'name': 'x', 'asOf': 1700651566}, {'name': 'y', 'alias': 'y_t1', 'asOf': 1700651563}], + 'query': {'filter': {'key': 'x', 'value': 'dogs', 'valueType': 'STRING', 'comparator': 'EQUAL', + 'asOf': 1700651563}}} + + ds._deserialize_gql_result(query) + assert ds._select == [{'name': 'x', 'asOf': 1700651566}, {'name': 'y', 'alias': 'y_t1', 'asOf': 1700651563}] + + def test_composite_filter(ds): add_int_fields(ds, "col1", "col2") ds2 = ds[(ds["col1"] > 5) & (ds["col2"] <= 3)]