Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enables using more efficient queries for count, distinct and newer_in #921

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/maggma/stores/open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
criteria_fields: Union[List, None] = None,
) -> pd.DataFrame:
"""
Queries the Store for a set of documents
Expand All @@ -96,6 +97,10 @@
values are 1 for ascending or -1 for descending.
skip: number documents to skip (from the start of the result set)
limit: limit on total number of documents returned
criteria_fields: if this value is not None, the in-memory index will
be used for the query if all the "criteria_fields" and "properties"
are present in the in-memory index; otherwise will default to
querying store type dependent implementation

Returns:
pd.DataFrame: DataFrame that contains all the documents that match
Expand Down Expand Up @@ -153,7 +158,7 @@
ret = ret[:limit]
return ret

def count(self, criteria: Optional[Dict] = None) -> int:
def count(self, criteria: Optional[Dict] = None, criteria_fields: Union[List, None] = None) -> int:

Check warning on line 161 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L161

Added line #L161 was not covered by tests
"""
Counts the number of documents matching the query criteria

Expand All @@ -162,22 +167,26 @@

Args:
criteria: see `query` method for details on how to construct
criteria_fields: see `query` method for details
"""
return len(self.query(criteria=criteria))
return len(self.query(criteria=criteria, criteria_fields=criteria_fields))

Check warning on line 172 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L172

Added line #L172 was not covered by tests

def distinct(self, field: str, criteria: Optional[Dict] = None) -> pd.Series:
def distinct(

Check warning on line 174 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L174

Added line #L174 was not covered by tests
self, field: str, criteria: Optional[Dict] = None, criteria_fields: Union[List, None] = None
) -> pd.Series:
"""
Get all distinct values for a field

Args:
field: the field(s) to get distinct values for
criteria: see `query` method for details on how to construct
criteria_fields: see `query` method for details

Returns:
pd.Series: Series of all the distinct values for the provided field
(after filtering by the provided criteria)
"""
ret = self.query(criteria=criteria, properties=[field])
ret = self.query(criteria=criteria, properties=[field], criteria_fields=criteria_fields)

Check warning on line 189 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L189

Added line #L189 was not covered by tests
return ret[field].drop_duplicates()

@property
Expand All @@ -196,7 +205,11 @@
return LU_KEY_ISOFORMAT[0](max)

def newer_in(
self, target: "PandasMemoryStore", criteria: Optional[Dict] = None, exhaustive: bool = False
self,
target: "PandasMemoryStore",
criteria: Optional[Dict] = None,
exhaustive: bool = False,
criteria_fields: Union[List, None] = None,
) -> pd.Series:
"""
Returns the keys of documents that are newer in the target
Expand All @@ -208,6 +221,7 @@
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
criteria_fields: see `query` method for details

Returns:
pd.Series: if no criteria is provided a Series of the keys of documents in the target store
Expand All @@ -232,30 +246,33 @@
props = [self.key, self.last_updated_field]
dates = {
d[self.key]: LU_KEY_ISOFORMAT[0](d.get(self.last_updated_field, datetime.max))
for _, d in self.query(properties=props).iterrows()
for _, d in self.query(properties=props, criteria_fields=criteria_fields).iterrows()
}
# Get the last_updated for the store we're comparing with
props = [target.key, target.last_updated_field]
target_dates = {
d[target.key]: LU_KEY_ISOFORMAT[0](d.get(target.last_updated_field, datetime.min))
for _, d in target.query(criteria=criteria, properties=props).iterrows()
for _, d in target.query(
criteria=criteria, properties=props, criteria_fields=criteria_fields
).iterrows()
}
new_keys = set(target_dates.keys()) - set(dates.keys())
updated_keys = {key for key, date in dates.items() if target_dates.get(key, datetime.min) > date}
return pd.Series(data=list(new_keys | updated_keys), name=self.key)

criteria = {"query": f"{self.last_updated_field} > '{LU_KEY_ISOFORMAT[1](self.last_updated)}'"}
return target.distinct(field=self.key, criteria=criteria)
return target.distinct(field=self.key, criteria=criteria, criteria_fields=[self.last_updated_field])

Check warning on line 264 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L264

Added line #L264 was not covered by tests

def get_merged_items(self, to_dt: pd.DataFrame, from_dt: pd.DataFrame) -> pd.DataFrame:
orig_columns = to_dt.columns
merged = to_dt.merge(from_dt, on=self.key, how="left", suffixes=("", "_B"))
for column in from_dt.columns:
if column not in self.key:
oc_dtype = merged[column].dtype

Check warning on line 271 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L271

Added line #L271 was not covered by tests
s = merged.pop(column + "_B")
s.name = column
merged.update(s)
merged.infer_objects(copy=False)
merged[column].astype(oc_dtype)

Check warning on line 275 in src/maggma/stores/open_data.py

View check run for this annotation

Codecov / codecov/patch

src/maggma/stores/open_data.py#L275

Added line #L275 was not covered by tests
return pd.concat(
(merged[orig_columns], from_dt[~from_dt.set_index(self.key).index.isin(to_dt.set_index(self.key).index)]),
ignore_index=True,
Expand Down
Loading