Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

versioning #386

Merged
merged 36 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f49e289
versioning #1
sdafni Nov 9, 2023
9a3d23c
versioning #2
sdafni Nov 14, 2023
dbc06c9
versioning #3
sdafni Nov 14, 2023
3862024
versioning #3
sdafni Nov 14, 2023
044cc74
versioning #4
sdafni Nov 15, 2023
1b9cc2a
versioning #5
sdafni Nov 15, 2023
2aa9153
versioning #6 dataclass
sdafni Nov 15, 2023
21bf506
versioning - merge master
sdafni Nov 15, 2023
c688a43
versioning - fix tests
sdafni Nov 15, 2023
274008c
pep shit
sdafni Nov 15, 2023
63d49da
not always asof
sdafni Nov 16, 2023
1977320
dataset + unit test
sdafni Nov 19, 2023
5c2574b
pep shit
sdafni Nov 19, 2023
9a8e0c2
fix tests+not always return asof, accept float ts
sdafni Nov 20, 2023
3741742
wrong int() place
sdafni Nov 20, 2023
5626840
cr fixes #1
sdafni Nov 22, 2023
2bb9d0d
cr fixes #2
sdafni Nov 22, 2023
713bca4
cr fixes #3 pep
sdafni Nov 22, 2023
cc82b87
cr fixes #4 ts fix
sdafni Nov 22, 2023
306287a
cr fixes #5 more pep
sdafni Nov 22, 2023
906ffd6
non existing columm
sdafni Nov 22, 2023
40873dc
cr fixes #6
sdafni Nov 23, 2023
00f971a
pep
sdafni Nov 23, 2023
cf09446
global asof api
sdafni Nov 27, 2023
b5207b7
global asof fixes
sdafni Nov 28, 2023
fa62205
source select default fix
sdafni Nov 28, 2023
146ebc6
functions doc
sdafni Nov 28, 2023
117e7b4
readme doc
sdafni Nov 28, 2023
cb11663
Update data_engine.md
sdafni Nov 28, 2023
557389a
Update data_engine.md
sdafni Nov 28, 2023
889841b
Update data_engine.md
sdafni Nov 28, 2023
27b2857
readme doc #2
sdafni Nov 28, 2023
cf7333c
readme doc #3
sdafni Nov 28, 2023
e77be14
readme doc #4
sdafni Nov 28, 2023
b0f1ee5
include all option
sdafni Nov 29, 2023
6e46ef9
wildcard syntax
sdafni Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dagshub/data_engine/client/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def update_metadata(self, datasource: Datasource, entries: List[DatapointMetadat
params = GqlMutations.update_metadata_params(
datasource_id=datasource.source.id, datapoints=[e.to_dict() for e in entries]
)

return self._exec(q, params)

def update_metadata_fields(self, datasource: Datasource, metadata_field_props: List[MetadataFieldSchema]):
Expand Down
5 changes: 2 additions & 3 deletions dagshub/data_engine/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from dagshub.data_engine.model.datasource import Datasource
from dagshub.data_engine.model.datasource_state import DatasourceState
from dagshub.data_engine.model.errors import DatasetNotFoundError
from dagshub.data_engine.model.query import DatasourceQuery


def get_datasets(repo: str) -> List[Datasource]:
Expand Down Expand Up @@ -41,7 +40,7 @@ def _from_gql_result(repo: str, dataset_result: "DatasetResult") -> "Datasource"
ds = Datasource(DatasourceState.from_gql_result(repo, dataset_result.datasource))

query_dict = json.loads(dataset_result.datasetQuery)
if "query" in query_dict:
ds._query = DatasourceQuery.deserialize(query_dict["query"])

ds._deserialize_gql_result(query_dict)

return ds
93 changes: 88 additions & 5 deletions dagshub/data_engine/model/datasource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import datetime
import gzip
import json
import logging
Expand Down Expand Up @@ -61,15 +62,54 @@ class DatapointMetadataUpdateEntry(json.JSONEncoder):
allowMultiple: bool = False


@dataclass
class Field:
column: str
as_of_time: Optional[Union[float, datetime.datetime]] = None
alias: Optional[str] = None

@property
def as_of_timestamp(self) -> Optional[int]:
if self.as_of_time is not None:
if isinstance(self.as_of_time, datetime.datetime):
return int(self.as_of_time.timestamp())
else:
return int(self.as_of_time)
else:
return None

def to_dict(self, ds: "Datasource") -> Dict[str, Any]:
if not ds.has_field(self.column):
raise FieldNotFoundError(self.column)

res_dict = {"name": self.column}
if self.as_of_time is not None:
res_dict["asOf"] = self.as_of_timestamp
if self.alias:
res_dict["alias"] = self.alias
return res_dict


class Datasource:
def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None):
def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None, select=None, as_of=None):
self._source = datasource
if query is None:
query = DatasourceQuery()
self._query = query

self._select = select or []
self._global_as_of = as_of
self.serialize_gql_query_input()

@property
def global_as_of_timestamp(self) -> Optional[int]:
if self._global_as_of is not None:
if isinstance(self._global_as_of, datetime.datetime):
return int(self._global_as_of.timestamp())
else:
return int(self._global_as_of)
else:
return None

@property
def source(self) -> "DatasourceState":
return self._source
Expand All @@ -82,7 +122,7 @@ def clear_query(self):
self._query = DatasourceQuery()

def __deepcopy__(self, memodict={}) -> "Datasource":
res = Datasource(self._source, self._query.__deepcopy__())
res = Datasource(self._source, self._query.__deepcopy__(), self._select, self._global_as_of)
return res

def get_query(self):
Expand All @@ -93,9 +133,20 @@ def annotation_fields(self) -> List[str]:
return [f.name for f in self.fields if f.is_annotation()]

def serialize_gql_query_input(self):
return {
result = {
"query": self._query.serialize_graphql(),
}
if self._select:
result["select"] = self._select

if self.global_as_of_timestamp:
result["asOf"] = self.global_as_of_timestamp
return result

def _deserialize_gql_result(self, query_dict):
if "query" in query_dict:
self._query = DatasourceQuery.deserialize(query_dict["query"])
self._select = query_dict.get("select")

def sample(self, start: Optional[int] = None, end: Optional[int] = None):
if start is not None:
Expand All @@ -120,6 +171,33 @@ def all(self) -> "QueryResult":
self._check_preprocess()
return self._source.client.get_datapoints(self)

def select(self, *selected: Union[str, Field]):
"""
using select() you can choose which columns will appear on the query result,
what their names will be (alias) and from what time. For example:
t = datetime.now(timezone.utc) - timedelta(hours=24)
q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode"))
"""
new_ds = self.__deepcopy__()

new_ds._select = [s.to_dict(self) if isinstance(s, Field) else {"name": s} for s in selected]
return new_ds

def as_of(self, time: Union[float, datetime.datetime]):
"""
as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example:

t = datetime.now(timezone.utc) - timedelta(hours=24)
q1 = (ds["size"] > 5).as_of(t)

in the above example all datapoints whose creation time is no later than 't',
and that match the condition at 't' - are returned.
"""
new_ds = self.__deepcopy__()

new_ds._global_as_of = time
return new_ds

def _check_preprocess(self):
self.source.get_from_dagshub()
if (
Expand Down Expand Up @@ -499,7 +577,7 @@ def __repr__(self):
queried_ds = ds[ds["value"] == 5]
"""

def __getitem__(self, other: Union[slice, str, "Datasource"]):
def __getitem__(self, other: Union[slice, str, "Datasource", "Field"]):
# Slicing - get items from the slice
if type(other) is slice:
return self.sample(other.start, other.stop)
Expand All @@ -515,6 +593,11 @@ def __getitem__(self, other: Union[slice, str, "Datasource"]):
else:
new_ds._query.compose("and", other_query)
return new_ds
elif type(other) is Field:
if not self.has_field(other.column):
raise FieldNotFoundError(other.column)
new_ds._query = DatasourceQuery(other.column, other.as_of_timestamp)
return new_ds
# "index" is a datasource with a query - return the datasource inside
# Example:
# ds = Dataset()
Expand Down
20 changes: 16 additions & 4 deletions dagshub/data_engine/model/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,20 @@ class FieldFilterOperand(enum.Enum):


class DatasourceQuery:
def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None):
def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None, as_of: Optional[int] = None):
self._operand_tree: Tree = Tree()
if type(column_or_query) is str:
# If it's ds["column"] then the root node is just the column name, will be filled later
self._operand_tree.create_node(UNFILLED_NODE_TAG, data={"field": column_or_query})
data = {"field": column_or_query}
if as_of:
data["as_of"] = int(as_of)
self._operand_tree.create_node(UNFILLED_NODE_TAG, data=data)
elif column_or_query is not None:
self._operand_tree.create_node(column_or_query)

if as_of:
self._as_of = as_of

def __repr__(self):
if self.is_empty:
return "Query: empty"
Expand Down Expand Up @@ -142,6 +148,8 @@ def _serialize_node(node: Node, tree: Tree) -> Dict:
raise WrongOperatorError(f"Operator {operand} is not supported")
key = node.data["field"]
value = node.data["value"]
as_of = node.data.get("as_of")

value_type = metadataTypeLookup[type(value)].value
if type(value) is bytes:
# TODO: this will need to probably be changed when we allow actual binary field comparisons
Expand All @@ -153,14 +161,17 @@ def _serialize_node(node: Node, tree: Tree) -> Dict:
f"Value type {value_type} is not supported for querying.\r\n"
f"Supported types: {list(metadataTypeLookup.keys())}"
)
return {
res = {
"filter": {
"key": key,
"value": str(value),
"valueType": value_type,
"comparator": query_op.value,
}
}
if as_of:
res["filter"]["asOf"] = as_of
return res

@staticmethod
def deserialize(serialized_query: Dict) -> "DatasourceQuery":
Expand Down Expand Up @@ -192,7 +203,8 @@ def _deserialize_node(node_dict: Dict, tree: Tree, parent_node=None) -> None:
value_type = metadataTypeLookupReverse[val["valueType"]]
converter = _metadataTypeCustomConverters.get(value_type, lambda x: value_type(x))
value = converter(val["value"])
node = Node(tag=comparator, data={"field": key, "value": value})
as_of = val.get("asOf")
node = Node(tag=comparator, data={"field": key, "value": value, "as_of": as_of})
sdafni marked this conversation as resolved.
Show resolved Hide resolved
tree.add_node(node, parent_node)
elif op_type in ("and", "or"):
main_node = Node(tag=op_type)
Expand Down
48 changes: 48 additions & 0 deletions docs/data_engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,54 @@ filtered_ds = ds[ds["episode"] > 5]
filtered_ds2 = filtered_ds[filtered_ds["has_baby_yoda"] == True]
```

### Versioning
#### Query filtering:
An extended syntax lets you query according to different versions of enrichments. For example:
```python
# size metadata is constantly changed and we want to address the one from 24h ago
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = ds[Field("size", as_of_time=t] > 5
```
in the above example all datapoints whose "size" column updated no later than 't' that match the condition '>5' are returned.


#### Query select:
Using select() you can choose which columns will appear on the query result, what their names will be (alias) and from what time. For example:
sdafni marked this conversation as resolved.
Show resolved Hide resolved

```python
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode"))
```

#### Global as_of time:
Using as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example:

```python
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = (ds["size"] > 5).as_of(t)
```
in the above example all datapoints whose creation time is no later than 't' and that match the condition at 't' - are returned.


#### Notes and limitations:

##### Time parameter:
- the time parameter can be POSIX timestamp or datetime object
- pay attention to timezones - use timestamp if known, or relative datetime if known (as in the above examples). if you use a specific date such as ```python dateutil.parser.parse("Tue 28 Nov 11:29 +2:00")``` specify the utc delta as shown here, otherwise this date can translate to different timestamps in the machine that runs the client and in dagshub backend.
##### Select list:
- both "x" and Field("x") can be used
- alias, as_of_time - are optional
- the list should make sense, i.e ```python.select(Field("x", as_of_time=t1), Field("x", as_of_time=t2))``` does not make sense since there is no alias to differentiate, the result will not reflect the intention. also ```python .select("x","x")```
sdafni marked this conversation as resolved.
Show resolved Hide resolved
- when no select list specified all datapoint enrichements are returned, else only those specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this answered my question. This needs to be written at the very beginning of the function doc basically.

Also we 100% should have a flag to include the regular fields along with the selected ones.
Especially considering the way we write the docs, people might assume that the selected fields will be added to the original ones. That's how it read to me

##### Global as_of behavior:
- it applies to all entities unless otherwise specified, i.e if we use ```python Field("x", as_of_time=t1))``` than this t will take precedence over a t2 specified in .as_of(t2). the sensibility of the results is up to the caller. you could get datapoints that existed in t1 < t2 based on a condition applied on their enrichmnts in t2.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs some rewording probably.

Fixing typos:

Suggested change
- it applies to all entities unless otherwise specified, i.e if we use ```python Field("x", as_of_time=t1))``` than this t will take precedence over a t2 specified in .as_of(t2). the sensibility of the results is up to the caller. you could get datapoints that existed in t1 < t2 based on a condition applied on their enrichmnts in t2.
- it applies to all entities unless otherwise specified, i.e if we use ```python Field("x", as_of_time=t1))``` then this t will take precedence over a t2 specified in .as_of(t2). the sensibility of the results is up to the caller. you could get datapoints that existed in t1 < t2 based on a condition applied on their enrichments in t2.

Alternatively trying to reword it myself:

as_of specificity:

  • as_of filters defined inside of select() take complete precedence over the as_of defined on top of the query.

This means that a query:

q = ds.select("size", Field("size", as_of_time=t1, alias="size_t1"), "train")
      .as_of(t2)
      .all()

Will return fields size and train as they were as of t2, and size_t1 will have size as of t1.
This behavior is kept regardless of whethert1 came before or after t2





## Saving queries

You can save the query you have on the datasource.
Expand Down
81 changes: 80 additions & 1 deletion tests/data_engine/test_querying.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dateutil.parser
import pytest

from dagshub.data_engine.model.datasource import (
Datasource,
Datasource, Field,
)
from dagshub.data_engine.model.errors import WrongOrderError, DatasetFieldComparisonError, FieldNotFoundError
from dagshub.data_engine.model.query import DatasourceQuery, bytes_deserializer
Expand All @@ -27,6 +28,84 @@ def test_simple_filter(ds):
assert q.to_dict() == expected


def test_versioning_query_datetime(ds):
add_int_fields(ds, "x")
# datetime
ds2 = ds[ds[Field("x", as_of_time=dateutil.parser.parse("Wed 22 Nov 2023"))] > 1]
q = ds2.get_query()
assert q.to_dict() == {'gt': {'data': {'as_of': int(dateutil.parser.parse("Wed 22 Nov 2023").timestamp()),
'field': 'x', 'value': 1}}}


def test_versioning_query_timestamp(ds):
add_int_fields(ds, "x")
# timestamp
ds2 = ds[ds[Field("x", as_of_time=1700604000)] > 1]
q = ds2.get_query()
assert q.to_dict() == {'gt': {'data': {'as_of': 1700604000, 'field': 'x', 'value': 1}}}


def test_versioning_select(ds):
add_int_fields(ds, "x")
add_int_fields(ds, "y")
add_int_fields(ds, "z")

# test select
ds2 = ((ds[ds[Field("x", as_of_time=123.99)] > 1]) &
(ds[ds[Field("x", as_of_time=345)] > 2]) |
(ds[ds[Field("y", as_of_time=789)] > 3])). \
select(Field("y", as_of_time=123), Field("x", as_of_time=456, alias="y_t1"))
q = ds2.get_query()
expected = {'or': {'children': [{'and': {'children': [{'gt': {'data': {'field': 'x', 'as_of': 123, 'value': 1}}},
{'gt': {'data': {'field': 'x', 'as_of': 345, 'value': 2}}}],
'data': None}},
{'gt': {'data': {'field': 'y', 'as_of': 789, 'value': 3}}}], 'data': None}}
assert q.to_dict() == expected

# test serialization works and includes select
expected_serialized = {'query': {'or': [{'and': [
{'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN', 'asOf': 123}},
{'filter': {'key': 'x', 'value': '2', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN', 'asOf': 345}}]}, {
'filter': {'key': 'y', 'value': '3', 'valueType': 'INTEGER',
'comparator': 'GREATER_THAN', 'asOf': 789}}]},
'select': [{'name': 'y', 'asOf': 123}, {'name': 'x', 'asOf': 456, 'alias': 'y_t1'}]}
assert ds2.serialize_gql_query_input() == expected_serialized


def test_versioning_select_as_strings(ds):
add_int_fields(ds, "x")
add_int_fields(ds, "y")
add_int_fields(ds, "z")

ds2 = (ds[ds["x"] > 1]).select("y", "z")
print(ds2.serialize_gql_query_input())
assert ds2.serialize_gql_query_input() == {
'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}},
'select': [{'name': 'y'}, {'name': 'z'}]}

ds2 = (ds[ds["x"] > 1]).select("y", Field("x"), "z")
print(ds2.serialize_gql_query_input())
assert ds2.serialize_gql_query_input() == {
'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}},
'select': [{'name': 'y'}, {'name': 'x'}, {'name': 'z'}]}

ds2 = (ds[ds["x"] > 1]).select("y", Field("x", as_of_time=1234), "z")
print(ds2.serialize_gql_query_input())
assert ds2.serialize_gql_query_input() == {
'query': {'filter': {'key': 'x', 'value': '1', 'valueType': 'INTEGER', 'comparator': 'GREATER_THAN'}},
'select': [{'name': 'y'}, {'name': 'x', 'asOf': 1234}, {'name': 'z'}]}


def test_versioning_dataset_deserialize(ds):
# test de-serialization works and includes select
query = {'select': [{'name': 'x', 'asOf': 1700651566}, {'name': 'y', 'alias': 'y_t1', 'asOf': 1700651563}],
'query': {'filter': {'key': 'x', 'value': 'dogs', 'valueType': 'STRING', 'comparator': 'EQUAL',
'asOf': 1700651563}}}

ds._deserialize_gql_result(query)
assert ds._select == [{'name': 'x', 'asOf': 1700651566}, {'name': 'y', 'alias': 'y_t1', 'asOf': 1700651563}]


def test_composite_filter(ds):
sdafni marked this conversation as resolved.
Show resolved Hide resolved
add_int_fields(ds, "col1", "col2")
ds2 = ds[(ds["col1"] > 5) & (ds["col2"] <= 3)]
Expand Down
Loading