Skip to content

Commit

Permalink
Fix /datasets?filter to select across namespaces (#3359)
Browse files Browse the repository at this point in the history
* Fix /datasets?filter to select across namespaces

PBENCH-1117

I discovered that the single simplistic `LEFT JOIN` allows combining native
`Dataset` and `Metadata` terms in a `SELECT`, but with limitations: because
the SQL join constructs a row for each `Metadata` match, matches for, e.g.,
`server.origin` and `dataset.metalog.pbench.script` or `global.server.legacy`
will appear on separate table rows. Each has duplicate `Dataset` columns, but
that doesn't help when trying to select across namespaces.

The only effective solution I was able to find was to cascade the joins in
order to build a new table with a separate column for each metadata namespace
row matching the dataset. This allows a single `SELECT` to work across the
columns in the new table.
  • Loading branch information
dbutenhof authored Mar 27, 2023
1 parent 91980b0 commit 2d1e6af
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 63 deletions.
115 changes: 78 additions & 37 deletions lib/pbench/server/api/resources/datasets_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from flask.wrappers import Request, Response
from sqlalchemy import and_, cast, or_, String
from sqlalchemy.exc import ProgrammingError, StatementError
from sqlalchemy.orm import Query
from sqlalchemy.orm import aliased, Query
from sqlalchemy.sql.expression import Alias

from pbench.server import JSON, JSONOBJECT, OperationCode, PbenchServerConfig
from pbench.server.api.resources import (
Expand Down Expand Up @@ -134,7 +135,9 @@ def get_paginated_obj(
return items, paginated_result

@staticmethod
def filter_query(filters: list[str], query: Query) -> Query:
def filter_query(
filters: list[str], aliases: dict[str, Alias], query: Query
) -> Query:
"""Provide Metadata filtering for datasets.
Add SQLAlchemy filters to fulfill the intent of a series of filter
Expand Down Expand Up @@ -231,9 +234,7 @@ def filter_query(filters: list[str], query: Query) -> Query:
raise APIAbort(HTTPStatus.BAD_REQUEST, str(MetadataBadKey(k)))
keys = k.split(".")
native_key = keys.pop(0).lower()
terms = []
use_dataset = False
user_private = None
filter = None

if native_key == Metadata.DATASET:
second = keys[0].lower()
Expand All @@ -258,8 +259,7 @@ def filter_query(filters: list[str], query: Query) -> Query:
raise APIAbort(
HTTPStatus.BAD_REQUEST, str(MetadataBadKey(k))
) from e
use_dataset = True
terms = [column.contains(v) if contains else column == v]
filter = column.contains(v) if contains else column == v
elif native_key == Metadata.USER:
# The user namespace requires special handling because the
# values are always qualified by the owning user rather than
Expand All @@ -270,17 +270,12 @@ def filter_query(filters: list[str], query: Query) -> Query:
HTTPStatus.UNAUTHORIZED,
f"Metadata key {k} cannot be used by an unauthenticated client",
)
user_private = [Metadata.user_id == user_id]

if not use_dataset:
expression = Metadata.value[keys].as_string()
terms = [
Metadata.key == native_key,
expression.contains(v) if contains else expression == v,
]
if user_private:
terms.extend(user_private)
filter = and_(*terms)

# NOTE: We don't want to *evaluate* the filter expression here, so
# check explicitly for None.
if filter is None:
expression = aliases[native_key].value[keys].as_string()
filter = expression.contains(v) if contains else expression == v

if combine_or:
or_list.append(filter)
Expand Down Expand Up @@ -327,14 +322,8 @@ def accumulate(self, aggregate: JSONOBJECT, key: str, value: Any):
def keyspace(self, query: Query) -> JSONOBJECT:
"""Aggregate the dataset metadata keyspace
Run the query we've compiled, but instead of returning Dataset proxies,
we only want the metadata key/value pairs we've selected.
NOTE: The SQL left outer join returns a row for each row in the "left"
table (Dataset) even if there is no matching foreign key in the "right"
table (Metadata). This means a dataset with no metadata will result in
a join row here with key and value of None. The `elif` in the loop will
silently ignore rows with a null key to handle this case.
Run the query we've compiled, and process the metadata collections
attached to each dataset.
Args:
query: The basic filtered SQLAlchemy query object
Expand All @@ -345,14 +334,18 @@ def keyspace(self, query: Query) -> JSONOBJECT:
aggregate: JSONOBJECT = {
"dataset": {c.name: None for c in Dataset.__table__._columns}
}
list = query.with_entities(Metadata.key, Metadata.value).all()
for k, v in list:
# "metalog" is a top-level key in the Metadata schema, but we
# report it as a sub-key of "dataset".
if k == Metadata.METALOG:
self.accumulate(aggregate["dataset"], k, v)
elif k:
self.accumulate(aggregate, k, v)

Database.dump_query(query, current_app.logger)

datasets = query.all()
for d in datasets:
for m in d.metadatas:
# "metalog" is a top-level key in the Metadata schema, but we
# report it as a sub-key of "dataset".
if m.key == Metadata.METALOG:
self.accumulate(aggregate["dataset"], m.key, m.value)
else:
self.accumulate(aggregate, m.key, m.value)
return aggregate

def datasets(self, request: Request, json: JSONOBJECT, query: Query) -> JSONOBJECT:
Expand Down Expand Up @@ -416,9 +409,58 @@ def _get(
A JSON response containing the paginated query result
"""
json = params.query
auth_id = Auth.get_current_user_id()

# Build a SQLAlchemy Query object expressing all of our constraints
query = Database.db_session.query(Dataset).outerjoin(Metadata)

"""SQL notes:
We want to be able to query across all four metadata key namespaces,
and a SELECT WHERE clause can only match within individual rows.
That is, if we look for "value1 = 'a' and value2 = 'b'", we won't
find a match with a single LEFT JOIN as the matched Metadata is
distributed across separate rows in the join table.
The way we get around this is ugly and inflexible, but I'm unable to
find a better way. We actually JOIN against Metadata four separate
times; all are constrained by the Dataset foreign key, and each is
additionally constrained by the primary Metadata.key value. (And the
"user" namespace is additionally constrained by the authorized user
ID, and omitted if we're not authenticated.)
This results in a join table with a JSON value column for each of the
tables, so that a single SELECT WHERE can match against all four of the
namespaces on a single row in the join table. In order to be able to
access the duplicate Metadata.value columns, we first create some
SQL name aliases. What we'll end up with, before we start adding our
filters, is something (simplified) like:
Dataset mtable stable gtable utable
--------- ------------- ----------- --------------- --------------
drb { { { {
"pbench":{ "origin":{ "dashboard":{ "dashboard":{
} } } }
} } } }
test { { { {
"pbench":{ "origin":{ "dashboard":{ "dashboard":{
} } } }
} } } }
"""
aliases = {
Metadata.METALOG: aliased(Metadata),
Metadata.SERVER: aliased(Metadata),
Metadata.GLOBAL: aliased(Metadata),
Metadata.USER: aliased(Metadata),
}
query = Database.db_session.query(Dataset)
for key, table in aliases.items():
terms = [table.dataset_ref == Dataset.id, table.key == key]
if key == Metadata.USER:
if not auth_id:
continue
terms.append(table.user_id == auth_id)
query = query.outerjoin(table, and_(*terms))

if "start" in json and "end" in json:
query = query.filter(Dataset.uploaded.between(json["start"], json["end"]))
elif "start" in json:
Expand All @@ -428,7 +470,7 @@ def _get(
if "name" in json:
query = query.filter(Dataset.name.contains(json["name"]))
if "filter" in json:
query = self.filter_query(json["filter"], query)
query = self.filter_query(json["filter"], aliases, query)

# The "mine" filter allows queries for datasets that are (true) or
# aren't (false) owned by the authenticated user. In the absense of
Expand All @@ -441,7 +483,6 @@ def _get(
HTTPStatus.BAD_REQUEST,
"'owner' and 'mine' filters cannot be used together",
)
auth_id = Auth.get_current_user_id()
if not auth_id:
raise APIAbort(
HTTPStatus.BAD_REQUEST, "'mine' filter requires authentication"
Expand Down
9 changes: 5 additions & 4 deletions lib/pbench/server/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ def init_db(server_config: PbenchServerConfig, logger: Logger):
)

@staticmethod
def dump_query(query: Query, logger: Logger):
def dump_query(query: Query, logger: Logger, level: int = DEBUG):
"""Dump a fully resolved SQL query if DEBUG logging is enabled
Args:
query: A SQLAlchemy Query object
logger: A Python logger object
level: [DEBUG] level at which to log
"""
if logger.isEnabledFor(DEBUG):
if logger.isEnabledFor(level):
try:
q_str = query.statement.compile(compile_kwargs={"literal_binds": True})
logger.debug("QUERY {}", q_str)
logger.log(level, "QUERY {}", q_str)
except Exception as e:
logger.debug("Can't compile query {}: {}", query, e)
logger.log(level, "Can't compile query {}: {}", query, e)
104 changes: 82 additions & 22 deletions lib/pbench/test/unit/server/test_datasets_list.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import datetime
from http import HTTPStatus
import re
from typing import Optional

import pytest
import requests
from sqlalchemy import and_
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import Query
from sqlalchemy.orm import aliased, Query

from pbench.server import JSON, JSONARRAY, JSONOBJECT
from pbench.server.api.resources import APIAbort
Expand Down Expand Up @@ -47,6 +49,28 @@ class TestDatasetsList:
fixture.
"""

def filter_setup(self, auth_id: Optional[str]):
"""Set up SQLAlchemy for filter_query unit tests.
This generates the basic query to allow generating
filter expressions.
"""
aliases = {
Metadata.METALOG: aliased(Metadata),
Metadata.SERVER: aliased(Metadata),
Metadata.GLOBAL: aliased(Metadata),
Metadata.USER: aliased(Metadata),
}
query = Database.db_session.query(Dataset)
for key, table in aliases.items():
terms = [table.dataset_ref == Dataset.id, table.key == key]
if key == Metadata.USER:
if not auth_id:
continue
terms.append(table.user_id == auth_id)
query = query.outerjoin(table, and_(*terms))
return aliases, query

@pytest.fixture()
def query_as(
self, client, server_config, more_datasets, provide_metadata, get_token_func
Expand Down Expand Up @@ -470,21 +494,15 @@ def test_get_repeat_keys(self, query_as):
(["dataset.name:fio"], "datasets.name = 'fio'"),
(
["dataset.metalog.pbench.script:fio"],
"dataset_metadata.key = 'metalog' "
"AND dataset_metadata.value[['pbench', 'script']] = 'fio'",
"dataset_metadata_1.value[['pbench', 'script']] = 'fio'",
),
(
["user.d.f:1"],
"dataset_metadata.key = 'user' AND dataset_metadata.value[['d', "
"'f']] = '1' AND dataset_metadata.user_id = '3'",
"dataset_metadata_4.value[['d', 'f']] = '1'",
),
(
["dataset.name:~fio", "^global.x:1", "^user.y:~yes"],
"(datasets.name LIKE '%' || 'fio' || '%') AND "
"(dataset_metadata.key = 'global' AND "
"dataset_metadata.value[['x']] = '1' OR dataset_metadata.key "
"= 'user' AND ((dataset_metadata.value[['y']]) LIKE '%' || "
"'yes' || '%') AND dataset_metadata.user_id = '3')",
"(datasets.name LIKE '%' || 'fio' || '%') AND (dataset_metadata_3.value[['x']] = '1' OR ((dataset_metadata_4.value[['y']]) LIKE '%' || 'yes' || '%'))",
),
(
["dataset.uploaded:~2000"],
Expand All @@ -503,14 +521,58 @@ def test_filter_query(self, monkeypatch, client, filters, expected):
lambda: DRB_USER_ID,
)
prefix = (
"SELECT datasets.access, datasets.id, datasets.name, "
"datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata ON datasets.id "
"= dataset_metadata.dataset_ref WHERE "
"SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_4 ON dataset_metadata_4.dataset_ref = datasets.id AND dataset_metadata_4.key = 'user' AND dataset_metadata_4.user_id = '3' WHERE "
)
query = DatasetsList.filter_query(
filters, Database.db_session.query(Dataset).outerjoin(Metadata)
aliases, query = self.filter_setup(DRB_USER_ID)
query = DatasetsList.filter_query(filters, aliases, query)
assert (
FLATTEN.sub(
" ",
str(query.statement.compile(compile_kwargs={"literal_binds": True})),
)
== prefix + expected
)

@pytest.mark.parametrize(
"filters,expected",
[
(["dataset.name:fio"], "datasets.name = 'fio'"),
(
["dataset.metalog.pbench.script:fio"],
"dataset_metadata_1.value[['pbench', 'script']] = 'fio'",
),
(
["dataset.name:~fio", "^global.x:1"],
"(datasets.name LIKE '%' || 'fio' || '%') AND dataset_metadata_3.value[['x']] = '1'",
),
(
["dataset.uploaded:~2000"],
"(CAST(datasets.uploaded AS VARCHAR) LIKE '%' || '2000' || '%')",
),
],
)
def test_filter_query_noauth(self, monkeypatch, client, filters, expected):
"""Test generation of Metadata value filters
Use the filter_query method directly to verify SQL generation from sets
of metadata filter expressions.
"""
monkeypatch.setattr(
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
prefix = (
"SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' WHERE "
)
aliases, query = self.filter_setup(None)
query = DatasetsList.filter_query(filters, aliases, query)
assert (
FLATTEN.sub(
" ",
Expand All @@ -527,10 +589,9 @@ def test_user_no_auth(self, monkeypatch, db_session):
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
aliases, query = self.filter_setup(None)
with pytest.raises(APIAbort) as e:
DatasetsList.filter_query(
["user.foo:1"], Database.db_session.query(Dataset).outerjoin(Metadata)
)
DatasetsList.filter_query(["user.foo:1"], aliases, query)
assert e.value.http_status == HTTPStatus.UNAUTHORIZED

@pytest.mark.parametrize(
Expand All @@ -548,10 +609,9 @@ def test_filter_errors(self, monkeypatch, db_session, meta, error):
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
aliases, query = self.filter_setup(None)
with pytest.raises(APIAbort) as e:
DatasetsList.filter_query(
[meta], Database.db_session.query(Dataset).outerjoin(Metadata)
)
DatasetsList.filter_query([meta], aliases, query)
assert e.value.http_status == error

@pytest.mark.parametrize(
Expand Down

0 comments on commit 2d1e6af

Please sign in to comment.