Skip to content

Commit

Permalink
Add search method for OpenSearch DBs
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisburr committed Sep 21, 2023
1 parent 655edf0 commit d42e236
Show file tree
Hide file tree
Showing 5 changed files with 482 additions and 8 deletions.
101 changes: 96 additions & 5 deletions src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import logging
import os
from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import Any, AsyncIterator, Self

from opensearchpy import AsyncOpenSearch

from diracx.core.exceptions import InvalidQueryError
from diracx.core.extensions import select_from_extension

OS_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -128,10 +132,97 @@ async def upsert(self, doc_id, document) -> None:
async def search(
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
) -> list[dict[str, Any]]:
# TODO: Implement properly
"""Search the database for matching results.
See the DiracX search API documentation for details.
"""
body = {}
if parameters:
body["_source"] = parameters
if search:
body["query"] = apply_search_filters(self.fields, search)
body["sort"] = []
for sort in sorts:
field_name = sort["parameter"]
field_type = self.fields.get(field_name, {}).get("type")
require_type("sort", field_name, field_type, {"keyword", "long", "date"})
body["sort"].append({field_name: {"order": sort["direction"]}})

params = {}
if page is not None:
params["from"] = (page - 1) * per_page
params["size"] = per_page

response = await self.client.search(
body={"query": {"bool": {"must": [{"term": {"JobID": 798811207}}]}}},
params=dict(size=per_page),
index=f"{self.index_prefix}*",
body=body, params=params, index=f"{self.index_prefix}*"
)
hits = [hit["_source"] for hit in response["hits"]["hits"]]

# Dates are returned as strings, convert them to Python datetimes
for hit in hits:
for field_name in hit:
if field_name not in self.fields:
continue
if self.fields[field_name]["type"] == "date":
hit[field_name] = datetime.strptime(hit[field_name], OS_DATE_FORMAT)

return hits


def require_type(operator, field_name, field_type, allowed_types):
if field_type not in allowed_types:
raise InvalidQueryError(
f"Cannot apply {operator} to {field_name} ({field_type=}, {allowed_types=})"
)
return [hit["_source"] for hit in response["hits"]["hits"]]


def apply_search_filters(db_fields, search):
"""Build an OpenSearch query from the given DiracX search parameters.
If the searched parameters cannot be efficiently translated to a query for
OpenSearch an InvalidQueryError exception is raised.
"""
result = {
"must": [],
"must_not": [],
}
for query in search:
field_name = query["parameter"]
field_type = db_fields.get(field_name, {}).get("type")
if field_type is None:
raise InvalidQueryError(
f"Field {field_name} is not included in the index mapping"
)

match operator := query["operator"]:
case "eq":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must"].append({"term": {field_name: {"value": query["value"]}}})
case "neq":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must_not"].append(
{"term": {field_name: {"value": query["value"]}}}
)
case "gt":
require_type(operator, field_name, field_type, {"long", "date"})
result["must"].append({"range": {field_name: {"gt": query["value"]}}})
case "lt":
require_type(operator, field_name, field_type, {"long", "date"})
result["must"].append({"range": {field_name: {"lt": query["value"]}}})
case "in":
require_type(
operator, field_name, field_type, {"keyword", "long", "date"}
)
result["must"].append({"terms": {field_name: query["values"]}})
# TODO: Implement like and ilike
# If the pattern is a simple "col like 'abc%'", we can use a prefix query
# Else we need to use a wildcard query where we replace % with * and _ with ?
# This should also need to handle escaping of %/_/*/?
case _:
raise InvalidQueryError(f"Unknown filter {query=}")

return {"bool": result}
4 changes: 3 additions & 1 deletion src/diracx/db/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def apply_search_filters(table, stmt, search):
elif query["operator"] == "in":
expr = column.in_(query["values"])
elif query["operator"] in "like":
expr = column.like(query["values"])
expr = column.like(query["value"])
elif query["operator"] in "ilike":
expr = column.ilike(query["value"])
else:
raise InvalidQueryError(f"Unknown filter {query=}")
stmt = stmt.where(expr)
Expand Down
3 changes: 2 additions & 1 deletion tests/db/opensearch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class DummyOSDB(BaseOSDB):

fields = {
"DateField": {"type": "date"},
"IntegerField": {"type": "long"},
"IntField": {"type": "long"},
"KeywordField0": {"type": "keyword"},
"KeywordField1": {"type": "keyword"},
"KeywordField2": {"type": "keyword"},
"TextField": {"type": "text"},
Expand Down
2 changes: 1 addition & 1 deletion tests/db/opensearch/test_index_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

DUMMY_DOCUMENT = {
"DateField": datetime.now(tz=timezone.utc),
"IntegerField": 1234,
"IntField": 1234,
"KeywordField1": "keyword1",
"KeywordField2": "keyword two",
"TextField": "text value",
Expand Down
Loading

0 comments on commit d42e236

Please sign in to comment.