Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

versioning #386

Merged
merged 36 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f49e289
versioning #1
sdafni Nov 9, 2023
9a3d23c
versioning #2
sdafni Nov 14, 2023
dbc06c9
versioning #3
sdafni Nov 14, 2023
3862024
versioning #3
sdafni Nov 14, 2023
044cc74
versioning #4
sdafni Nov 15, 2023
1b9cc2a
versioning #5
sdafni Nov 15, 2023
2aa9153
versioning #6 dataclass
sdafni Nov 15, 2023
21bf506
versioning - merge master
sdafni Nov 15, 2023
c688a43
versioning - fix tests
sdafni Nov 15, 2023
274008c
pep shit
sdafni Nov 15, 2023
63d49da
not always asof
sdafni Nov 16, 2023
1977320
dataset + unit test
sdafni Nov 19, 2023
5c2574b
pep shit
sdafni Nov 19, 2023
9a8e0c2
fix tests+not always return asof, accept float ts
sdafni Nov 20, 2023
3741742
wrong int() place
sdafni Nov 20, 2023
5626840
cr fixes #1
sdafni Nov 22, 2023
2bb9d0d
cr fixes #2
sdafni Nov 22, 2023
713bca4
cr fixes #3 pep
sdafni Nov 22, 2023
cc82b87
cr fixes #4 ts fix
sdafni Nov 22, 2023
306287a
cr fixes #5 more pep
sdafni Nov 22, 2023
906ffd6
non existing columm
sdafni Nov 22, 2023
40873dc
cr fixes #6
sdafni Nov 23, 2023
00f971a
pep
sdafni Nov 23, 2023
cf09446
global asof api
sdafni Nov 27, 2023
b5207b7
global asof fixes
sdafni Nov 28, 2023
fa62205
source select default fix
sdafni Nov 28, 2023
146ebc6
functions doc
sdafni Nov 28, 2023
117e7b4
readme doc
sdafni Nov 28, 2023
cb11663
Update data_engine.md
sdafni Nov 28, 2023
557389a
Update data_engine.md
sdafni Nov 28, 2023
889841b
Update data_engine.md
sdafni Nov 28, 2023
27b2857
readme doc #2
sdafni Nov 28, 2023
cf7333c
readme doc #3
sdafni Nov 28, 2023
e77be14
readme doc #4
sdafni Nov 28, 2023
b0f1ee5
include all option
sdafni Nov 29, 2023
6e46ef9
wildcard syntax
sdafni Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dagshub/data_engine/client/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def update_metadata(self, datasource: Datasource, entries: List[DatapointMetadat
params = GqlMutations.update_metadata_params(
datasource_id=datasource.source.id, datapoints=[e.to_dict() for e in entries]
)

return self._exec(q, params)

def update_metadata_fields(self, datasource: Datasource, metadata_field_props: List[MetadataFieldSchema]):
Expand Down
5 changes: 2 additions & 3 deletions dagshub/data_engine/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from dagshub.data_engine.model.datasource import Datasource
from dagshub.data_engine.model.datasource_state import DatasourceState
from dagshub.data_engine.model.errors import DatasetNotFoundError
from dagshub.data_engine.model.query import DatasourceQuery


def get_datasets(repo: str) -> List[Datasource]:
Expand Down Expand Up @@ -41,7 +40,7 @@ def _from_gql_result(repo: str, dataset_result: "DatasetResult") -> "Datasource"
ds = Datasource(DatasourceState.from_gql_result(repo, dataset_result.datasource))

query_dict = json.loads(dataset_result.datasetQuery)
if "query" in query_dict:
ds._query = DatasourceQuery.deserialize(query_dict["query"])

ds._deserialize_gql_result(query_dict)

return ds
108 changes: 103 additions & 5 deletions dagshub/data_engine/model/datasource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import datetime
import gzip
import json
import logging
Expand Down Expand Up @@ -61,15 +62,54 @@ class DatapointMetadataUpdateEntry(json.JSONEncoder):
allowMultiple: bool = False


@dataclass
class Field:
column: str
as_of_time: Optional[Union[float, datetime.datetime]] = None
alias: Optional[str] = None

@property
def as_of_timestamp(self) -> Optional[int]:
if self.as_of_time is not None:
if isinstance(self.as_of_time, datetime.datetime):
return int(self.as_of_time.timestamp())
else:
return int(self.as_of_time)
else:
return None

def to_dict(self, ds: "Datasource") -> Dict[str, Any]:
if not ds.has_field(self.column):
raise FieldNotFoundError(self.column)

res_dict = {"name": self.column}
if self.as_of_time is not None:
res_dict["asOf"] = self.as_of_timestamp
if self.alias:
res_dict["alias"] = self.alias
return res_dict


class Datasource:
def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None):
def __init__(self, datasource: "DatasourceState", query: Optional[DatasourceQuery] = None, select=None, as_of=None):
self._source = datasource
if query is None:
query = DatasourceQuery()
self._query = query

self._select = select or []
self._global_as_of = as_of
self.serialize_gql_query_input()

@property
def global_as_of_timestamp(self) -> Optional[int]:
if self._global_as_of is not None:
if isinstance(self._global_as_of, datetime.datetime):
return int(self._global_as_of.timestamp())
else:
return int(self._global_as_of)
else:
return None

@property
def source(self) -> "DatasourceState":
return self._source
Expand All @@ -82,7 +122,7 @@ def clear_query(self):
self._query = DatasourceQuery()

def __deepcopy__(self, memodict={}) -> "Datasource":
res = Datasource(self._source, self._query.__deepcopy__())
res = Datasource(self._source, self._query.__deepcopy__(), self._select, self._global_as_of)
return res

def get_query(self):
Expand All @@ -93,9 +133,20 @@ def annotation_fields(self) -> List[str]:
return [f.name for f in self.fields if f.is_annotation()]

def serialize_gql_query_input(self):
return {
result = {
"query": self._query.serialize_graphql(),
}
if self._select:
result["select"] = self._select

if self.global_as_of_timestamp:
result["asOf"] = self.global_as_of_timestamp
return result

def _deserialize_gql_result(self, query_dict):
if "query" in query_dict:
self._query = DatasourceQuery.deserialize(query_dict["query"])
self._select = query_dict.get("select")

def sample(self, start: Optional[int] = None, end: Optional[int] = None):
if start is not None:
Expand All @@ -120,6 +171,48 @@ def all(self) -> "QueryResult":
self._check_preprocess()
return self._source.client.get_datapoints(self)

def select(self, *selected: Union[str, Field]):
"""
using select() you can choose which columns will appear on the query result,
what their names will be (alias) and from what time. For example:
t = datetime.now(timezone.utc) - timedelta(hours=24)
q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode"))
"""
new_ds = self.__deepcopy__()

include_all = False
for s in selected:
if isinstance(s, Field):
new_ds._select.append(s.to_dict(self))
else:
if s != '*':
new_ds._select.append({"name": s})
else:
include_all = True

if include_all:
aliases = [s["alias"] for s in new_ds._select if "alias" in s]
for f in self.fields:
if f.name in aliases:
raise ValueError(f"alias {f.name} can't be used, a column with that name exists")
new_ds._select.append({"name": f.name})
return new_ds

def as_of(self, time: Union[float, datetime.datetime]):
"""
as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example:

t = datetime.now(timezone.utc) - timedelta(hours=24)
q1 = (ds["size"] > 5).as_of(t)

in the above example all datapoints whose creation time is no later than 't',
and that match the condition at 't' - are returned.
"""
new_ds = self.__deepcopy__()

new_ds._global_as_of = time
return new_ds

def _check_preprocess(self):
self.source.get_from_dagshub()
if (
Expand Down Expand Up @@ -499,7 +592,7 @@ def __repr__(self):
queried_ds = ds[ds["value"] == 5]
"""

def __getitem__(self, other: Union[slice, str, "Datasource"]):
def __getitem__(self, other: Union[slice, str, "Datasource", "Field"]):
# Slicing - get items from the slice
if type(other) is slice:
return self.sample(other.start, other.stop)
Expand All @@ -515,6 +608,11 @@ def __getitem__(self, other: Union[slice, str, "Datasource"]):
else:
new_ds._query.compose("and", other_query)
return new_ds
elif type(other) is Field:
if not self.has_field(other.column):
raise FieldNotFoundError(other.column)
new_ds._query = DatasourceQuery(other.column, other.as_of_timestamp)
return new_ds
# "index" is a datasource with a query - return the datasource inside
# Example:
# ds = Dataset()
Expand Down
20 changes: 16 additions & 4 deletions dagshub/data_engine/model/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,20 @@ class FieldFilterOperand(enum.Enum):


class DatasourceQuery:
def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None):
def __init__(self, column_or_query: Optional[Union[str, "DatasourceQuery"]] = None, as_of: Optional[int] = None):
self._operand_tree: Tree = Tree()
if type(column_or_query) is str:
# If it's ds["column"] then the root node is just the column name, will be filled later
self._operand_tree.create_node(UNFILLED_NODE_TAG, data={"field": column_or_query})
data = {"field": column_or_query}
if as_of:
data["as_of"] = int(as_of)
self._operand_tree.create_node(UNFILLED_NODE_TAG, data=data)
elif column_or_query is not None:
self._operand_tree.create_node(column_or_query)

if as_of:
self._as_of = as_of

def __repr__(self):
if self.is_empty:
return "Query: empty"
Expand Down Expand Up @@ -142,6 +148,8 @@ def _serialize_node(node: Node, tree: Tree) -> Dict:
raise WrongOperatorError(f"Operator {operand} is not supported")
key = node.data["field"]
value = node.data["value"]
as_of = node.data.get("as_of")

value_type = metadataTypeLookup[type(value)].value
if type(value) is bytes:
# TODO: this will need to probably be changed when we allow actual binary field comparisons
Expand All @@ -153,14 +161,17 @@ def _serialize_node(node: Node, tree: Tree) -> Dict:
f"Value type {value_type} is not supported for querying.\r\n"
f"Supported types: {list(metadataTypeLookup.keys())}"
)
return {
res = {
"filter": {
"key": key,
"value": str(value),
"valueType": value_type,
"comparator": query_op.value,
}
}
if as_of:
res["filter"]["asOf"] = as_of
return res

@staticmethod
def deserialize(serialized_query: Dict) -> "DatasourceQuery":
Expand Down Expand Up @@ -192,7 +203,8 @@ def _deserialize_node(node_dict: Dict, tree: Tree, parent_node=None) -> None:
value_type = metadataTypeLookupReverse[val["valueType"]]
converter = _metadataTypeCustomConverters.get(value_type, lambda x: value_type(x))
value = converter(val["value"])
node = Node(tag=comparator, data={"field": key, "value": value})
as_of = val.get("asOf")
node = Node(tag=comparator, data={"field": key, "value": value, "as_of": as_of})
sdafni marked this conversation as resolved.
Show resolved Hide resolved
tree.add_node(node, parent_node)
elif op_type in ("and", "or"):
main_node = Node(tag=op_type)
Expand Down
53 changes: 53 additions & 0 deletions docs/data_engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,59 @@ filtered_ds = ds[ds["episode"] > 5]
filtered_ds2 = filtered_ds[filtered_ds["has_baby_yoda"] == True]
```

### Versioning
#### Query filtering:
An extended syntax lets you query according to different versions of enrichments. For example:
```python
# size metadata is constantly changed and we want to address the one from 24h ago
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = ds[Field("size", as_of_time=t] > 5
```
in the above example all datapoints whose "size" column updated no later than 't' that match the condition '>5' are returned.


#### Query select:
Using select() you can choose which columns will appear on the query result, what their names will be (alias) and from what time. For example:
sdafni marked this conversation as resolved.
Show resolved Hide resolved

```python
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), Field("episode"))
```
in the above example the result set of datapoints will have 2 columns of metadata: "size_asof_24h_ago" and "episode".
all other metadata columns are ommited.if the desired result is to get all metadata columns and in addition the selected list,
add "*" to the list, example:
```python
q1 = (ds["size"] > 5).select(Field("size", as_of_time=t, alias="size_asof_24h_ago"), "*")
```

#### Global as_of time:
Using as_of() applied on query allows you to view a snapshot of datapoint/enrichments. For example:

```python
t = datetime.now(timezone.utc) - timedelta(hours=24)

q1 = (ds["size"] > 5).as_of(t)
```
in the above example all datapoints whose creation time is no later than 't' and that match the condition at 't' - are returned.


#### Notes and limitations:

##### Time parameter:
- the time parameter can be POSIX timestamp or datetime object
- pay attention to timezones - use timestamp if known, or relative datetime if known (as in the above examples). if you use a specific date such as `dateutil.parser.parse("Tue 28 Nov 11:29 +2:00")` specify the utc delta as shown here, otherwise this date can translate to different timestamps in the machine that runs the client and in dagshub backend.
##### Select list:
- both "x" and `Field("x")` can be used
- alias, as_of_time - are optional
- we currently do not check the list for contradictions/overwrites/duplications, i.e `select(Field("x", as_of_time=t1), Field("x", as_of_time=t2))` does not make sense since there is no alias to differentiate, the result will not reflect the intention. also `select("x","x")`
##### Global as_of behavior:
- it applies to all entities unless otherwise specified, i.e if we use Field("x", as_of_time=t1)) then t1 will precede over a t2 specified in .as_of(t2). the sensibility of the results is up to the caller. you could get datapoints that existed in t1 < t2 based on a condition applied on their enrichments in t2.




## Saving queries

You can save the query you have on the datasource.
Expand Down
Loading
Loading