Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement OpenSearch DB search method #100

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ exclude_lines =
# Don't complain if tests don't hit defensive assertion code:
raise NotImplementedError

# Ignore the default implementation of abstract methods
^\s*\.\.\.$

if TYPE_CHECKING:
101 changes: 96 additions & 5 deletions src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import logging
import os
from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import Any, AsyncIterator, Self

from opensearchpy import AsyncOpenSearch

from diracx.core.exceptions import InvalidQueryError
from diracx.core.extensions import select_from_extension

OS_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -128,10 +132,97 @@ async def upsert(self, doc_id, document) -> None:
async def search(
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
) -> list[dict[str, Any]]:
# TODO: Implement properly
"""Search the database for matching results.

See the DiracX search API documentation for details.
"""
body = {}
if parameters:
body["_source"] = parameters
if search:
body["query"] = apply_search_filters(self.fields, search)
body["sort"] = []
for sort in sorts:
field_name = sort["parameter"]
field_type = self.fields.get(field_name, {}).get("type")
require_type("sort", field_name, field_type, {"keyword", "long", "date"})
body["sort"].append({field_name: {"order": sort["direction"]}})

params = {}
if page is not None:
params["from"] = (page - 1) * per_page
params["size"] = per_page

response = await self.client.search(
body={"query": {"bool": {"must": [{"term": {"JobID": 798811207}}]}}},
params=dict(size=per_page),
index=f"{self.index_prefix}*",
body=body, params=params, index=f"{self.index_prefix}*"
)
hits = [hit["_source"] for hit in response["hits"]["hits"]]

# Dates are returned as strings, convert them to Python datetimes
for hit in hits:
for field_name in hit:
if field_name not in self.fields:
continue
if self.fields[field_name]["type"] == "date":
hit[field_name] = datetime.strptime(hit[field_name], OS_DATE_FORMAT)

return hits


def require_type(operator, field_name, field_type, allowed_types):
if field_type not in allowed_types:
raise InvalidQueryError(
f"Cannot apply {operator} to {field_name} ({field_type=}, {allowed_types=})"
)
return [hit["_source"] for hit in response["hits"]["hits"]]


def apply_search_filters(db_fields, search):
"""Build an OpenSearch query from the given DiracX search parameters.

If the searched parameters cannot be efficiently translated to a query for
OpenSearch an InvalidQueryError exception is raised.
"""
result = {
"must": [],
"must_not": [],
}
for query in search:
field_name = query["parameter"]
field_type = db_fields.get(field_name, {}).get("type")
if field_type is None:
raise InvalidQueryError(
f"Field {field_name} is not included in the index mapping"
)

match operator := query["operator"]:
case "eq":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must"].append({"term": {field_name: {"value": query["value"]}}})
case "neq":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must_not"].append(
{"term": {field_name: {"value": query["value"]}}}
)
case "gt":
require_type(operator, field_name, field_type, {"long", "date"})
result["must"].append({"range": {field_name: {"gt": query["value"]}}})
case "lt":
require_type(operator, field_name, field_type, {"long", "date"})
result["must"].append({"range": {field_name: {"lt": query["value"]}}})
case "in":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must"].append({"terms": {field_name: query["values"]}})
# TODO: Implement like and ilike
# If the pattern is a simple "col like 'abc%'", we can use a prefix query
# Else we need to use a wildcard query where we replace % with * and _ with ?
# This should also need to handle escaping of %/_/*/?
case _:
raise InvalidQueryError(f"Unknown filter {query=}")

return {"bool": result}
4 changes: 3 additions & 1 deletion src/diracx/db/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def apply_search_filters(table, stmt, search):
elif query["operator"] == "in":
expr = column.in_(query["values"])
elif query["operator"] in "like":
expr = column.like(query["values"])
expr = column.like(query["value"])
elif query["operator"] in "ilike":
expr = column.ilike(query["value"])
else:
raise InvalidQueryError(f"Unknown filter {query=}")
stmt = stmt.where(expr)
Expand Down
3 changes: 2 additions & 1 deletion tests/db/opensearch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class DummyOSDB(BaseOSDB):

fields = {
"DateField": {"type": "date"},
"IntegerField": {"type": "long"},
"IntField": {"type": "long"},
"KeywordField0": {"type": "keyword"},
"KeywordField1": {"type": "keyword"},
"KeywordField2": {"type": "keyword"},
"TextField": {"type": "text"},
Expand Down
2 changes: 1 addition & 1 deletion tests/db/opensearch/test_index_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

DUMMY_DOCUMENT = {
"DateField": datetime.now(tz=timezone.utc),
"IntegerField": 1234,
"IntField": 1234,
"KeywordField1": "keyword1",
"KeywordField2": "keyword two",
"TextField": "text value",
Expand Down
Loading