Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix /datasets?filter to select across namespaces #3359

Merged
merged 5 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 80 additions & 37 deletions lib/pbench/server/api/resources/datasets_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from flask.wrappers import Request, Response
from sqlalchemy import and_, cast, or_, String
from sqlalchemy.exc import ProgrammingError, StatementError
from sqlalchemy.orm import Query
from sqlalchemy.orm import aliased, Query
from sqlalchemy.sql.expression import Alias

from pbench.server import JSON, JSONOBJECT, OperationCode, PbenchServerConfig
from pbench.server.api.resources import (
Expand Down Expand Up @@ -134,7 +135,9 @@ def get_paginated_obj(
return items, paginated_result

@staticmethod
def filter_query(filters: list[str], query: Query) -> Query:
def filter_query(
filters: list[str], aliases: dict[str, Alias], query: Query
) -> Query:
"""Provide Metadata filtering for datasets.

Add SQLAlchemy filters to fulfill the intent of a series of filter
Expand Down Expand Up @@ -231,9 +234,7 @@ def filter_query(filters: list[str], query: Query) -> Query:
raise APIAbort(HTTPStatus.BAD_REQUEST, str(MetadataBadKey(k)))
keys = k.split(".")
native_key = keys.pop(0).lower()
terms = []
use_dataset = False
user_private = None
filter = None

if native_key == Metadata.DATASET:
second = keys[0].lower()
Expand All @@ -258,8 +259,7 @@ def filter_query(filters: list[str], query: Query) -> Query:
raise APIAbort(
HTTPStatus.BAD_REQUEST, str(MetadataBadKey(k))
) from e
use_dataset = True
terms = [column.contains(v) if contains else column == v]
filter = column.contains(v) if contains else column == v
elif native_key == Metadata.USER:
# The user namespace requires special handling because the
# values are always qualified by the owning user rather than
Expand All @@ -270,17 +270,12 @@ def filter_query(filters: list[str], query: Query) -> Query:
HTTPStatus.UNAUTHORIZED,
f"Metadata key {k} cannot be used by an unauthenticated client",
)
user_private = [Metadata.user_id == user_id]

if not use_dataset:
expression = Metadata.value[keys].as_string()
terms = [
Metadata.key == native_key,
expression.contains(v) if contains else expression == v,
]
if user_private:
terms.extend(user_private)
filter = and_(*terms)

# NOTE: We don't want to *evaluate* the filter expression here, so
# check explicitly for None.
if filter is None:
expression = aliases[native_key].value[keys].as_string()
webbnh marked this conversation as resolved.
Show resolved Hide resolved
filter = expression.contains(v) if contains else expression == v
webbnh marked this conversation as resolved.
Show resolved Hide resolved

if combine_or:
or_list.append(filter)
Expand Down Expand Up @@ -327,14 +322,8 @@ def accumulate(self, aggregate: JSONOBJECT, key: str, value: Any):
def keyspace(self, query: Query) -> JSONOBJECT:
"""Aggregate the dataset metadata keyspace

Run the query we've compiled, but instead of returning Dataset proxies,
we only want the metadata key/value pairs we've selected.

NOTE: The SQL left outer join returns a row for each row in the "left"
table (Dataset) even if there is no matching foreign key in the "right"
table (Metadata). This means a dataset with no metadata will result in
a join row here with key and value of None. The `elif` in the loop will
silently ignore rows with a null key to handle this case.
Run the query we've compiled, and process the metadata collections
attached to each dataset.

Args:
query: The basic filtered SQLAlchemy query object
Expand All @@ -345,14 +334,18 @@ def keyspace(self, query: Query) -> JSONOBJECT:
aggregate: JSONOBJECT = {
"dataset": {c.name: None for c in Dataset.__table__._columns}
}
list = query.with_entities(Metadata.key, Metadata.value).all()
for k, v in list:
# "metalog" is a top-level key in the Metadata schema, but we
# report it as a sub-key of "dataset".
webbnh marked this conversation as resolved.
Show resolved Hide resolved
if k == Metadata.METALOG:
self.accumulate(aggregate["dataset"], k, v)
elif k:
self.accumulate(aggregate, k, v)

Database.dump_query(query, current_app.logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The effect of this is conditional upon DEBUG-level logging being enabled, right? It would be nice if "debug" appeared in its name....

OK, having read further, I see that you've enhanced this to work for non-DEBUG-level; in that case, I humbly request that you explicitly include the third argument here, which will make it more obvious to the reader that it's a "debug thing".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a debug thing until its not. The default behavior of this method hasn't changed. I just extended it to allow externally overriding to info for testing. I didn't see any point in renaming, and adding the default argument here is redundant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the default argument here makes it clear to the reader that the function is intended to be active only for debugging (otherwise, this behavior is implicit, and the concerned reader has to go find the definition of dump_query() to figure that out). (We can keep the argument's default value for compatibility with other, existing code; but it would be good from a code-as-documentation perspective to supply the value explicitly here.)


datasets = query.all()
for d in datasets:
for m in d.metadatas:
# "metalog" is a top-level key in the Metadata schema, but we
# report it as a sub-key of "dataset".
if m.key == Metadata.METALOG:
self.accumulate(aggregate["dataset"], m.key, m.value)
else:
self.accumulate(aggregate, m.key, m.value)
Comment on lines +345 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternately,

                a = aggregate["dataset"] if m.key == Metadata.METALOG else aggregate
                self.accumulate(a, m.key, m.value)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I like the explicit if better here than the local variable, especially when I'm pretty sure black wouldn't let me put it on one line anyway.

Copy link
Member

@webbnh webbnh Mar 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really like it better, that's fine. (Myself, I prefer a common call point with conditional data over multiple conditional call points.) However, I checked with Mr. Black before I posted it, and he said it was fine.

return aggregate

def datasets(self, request: Request, json: JSONOBJECT, query: Query) -> JSONOBJECT:
Expand Down Expand Up @@ -416,9 +409,60 @@ def _get(
A JSON response containing the paginated query result
"""
json = params.query
auth_id = Auth.get_current_user_id()

# Build a SQLAlchemy Query object expressing all of our constraints
query = Database.db_session.query(Dataset).outerjoin(Metadata)

"""SQL notes:

We want to be able to query across all four metadata key namespaces,
and a SELECT WHERE clause can only match within individual rows.
That is, if we look for "value1 = 'a' and value2 = 'b'", we won't
find a match with a single LEFT JOIN as the matched Metadata is
distributed across separate rows in the join table.

The way we get around this is ugly and inflexible, but I'm unable to
find a better way. We actually JOIN against Metadata four separate
times; all are constrained by the Dataset foreign key, and each is
additionally constrained by the primary Metadata.key value. (And the
"user" namespace is additionally constrained by the authorized user
ID, and omitted if we're not authenticated.)
webbnh marked this conversation as resolved.
Show resolved Hide resolved

This results in a join table with a JSON value column for each of the
tables, so that a single SELECT WHERE can match against all four of the
namespaces on a single row in the join table. In order to be able to
access the duplicate Metadata.value columns, we first create some
SQL name aliases. What we'll end up with, before we start adding our
filters, is something (simplified) like:

Dataset mtable stable gtable utable
--------- ------------- ----------- --------------- --------------
drb { { { {
"pbench":{ "origin":{ "dashboard":{ "dashboard":{
} } } }
} } } }
test { { { {
"pbench":{ "origin":{ "dashboard":{ "dashboard":{
} } } }
} } } }
"""
aliases = {
Metadata.METALOG: aliased(Metadata),
Metadata.SERVER: aliased(Metadata),
Metadata.GLOBAL: aliased(Metadata),
Metadata.USER: aliased(Metadata),
}
query = Database.db_session.query(Dataset)
for key, table in aliases.items():
terms = [table.dataset_ref == Dataset.id, table.key == key]
if key == Metadata.USER:
if auth_id:
terms.append(table.user_id == auth_id)
else:
continue
current_app.logger.info("Adding JOIN {}", key)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
query = query.outerjoin(table, and_(*terms))
webbnh marked this conversation as resolved.
Show resolved Hide resolved

if "start" in json and "end" in json:
query = query.filter(Dataset.uploaded.between(json["start"], json["end"]))
elif "start" in json:
Expand All @@ -428,7 +472,7 @@ def _get(
if "name" in json:
query = query.filter(Dataset.name.contains(json["name"]))
if "filter" in json:
query = self.filter_query(json["filter"], query)
query = self.filter_query(json["filter"], aliases, query)

# The "mine" filter allows queries for datasets that are (true) or
# aren't (false) owned by the authenticated user. In the absense of
Expand All @@ -441,7 +485,6 @@ def _get(
HTTPStatus.BAD_REQUEST,
"'owner' and 'mine' filters cannot be used together",
)
auth_id = Auth.get_current_user_id()
if not auth_id:
raise APIAbort(
HTTPStatus.BAD_REQUEST, "'mine' filter requires authentication"
Expand Down
9 changes: 5 additions & 4 deletions lib/pbench/server/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ def init_db(server_config: PbenchServerConfig, logger: Logger):
)

@staticmethod
def dump_query(query: Query, logger: Logger):
def dump_query(query: Query, logger: Logger, level: int = DEBUG):
"""Dump a fully resolved SQL query if DEBUG logging is enabled

Args:
query: A SQLAlchemy Query object
logger: A Python logger object
level: [DEBUG] level at which to log
"""
if logger.isEnabledFor(DEBUG):
if logger.isEnabledFor(level):
try:
q_str = query.statement.compile(compile_kwargs={"literal_binds": True})
logger.debug("QUERY {}", q_str)
logger.log(level, "QUERY {}", q_str)
except Exception as e:
logger.debug("Can't compile query {}: {}", query, e)
logger.log(level, "Can't compile query {}: {}", query, e)
109 changes: 87 additions & 22 deletions lib/pbench/test/unit/server/test_datasets_list.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import datetime
from http import HTTPStatus
import re
from typing import Optional

import pytest
import requests
from sqlalchemy import and_
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import Query
from sqlalchemy.orm import aliased, Query

from pbench.server import JSON, JSONARRAY, JSONOBJECT
from pbench.server.api.resources import APIAbort
Expand Down Expand Up @@ -47,6 +49,29 @@ class TestDatasetsList:
fixture.
"""

def filter_setup(self, auth_id: Optional[str]):
"""Set up SQLAlchemy for filter_query unit tests.

This generates the basic query to allow generating
filter expressions.
"""
aliases = {
Metadata.METALOG: aliased(Metadata),
Metadata.SERVER: aliased(Metadata),
Metadata.GLOBAL: aliased(Metadata),
Metadata.USER: aliased(Metadata),
}
query = Database.db_session.query(Dataset)
for key, table in aliases.items():
terms = [table.dataset_ref == Dataset.id, table.key == key]
if key == Metadata.USER:
if auth_id:
terms.append(table.user_id == auth_id)
else:
continue
query = query.outerjoin(table, and_(*terms))
return aliases, query

@pytest.fixture()
def query_as(
self, client, server_config, more_datasets, provide_metadata, get_token_func
Expand Down Expand Up @@ -470,21 +495,15 @@ def test_get_repeat_keys(self, query_as):
(["dataset.name:fio"], "datasets.name = 'fio'"),
(
["dataset.metalog.pbench.script:fio"],
"dataset_metadata.key = 'metalog' "
"AND dataset_metadata.value[['pbench', 'script']] = 'fio'",
"dataset_metadata_1.value[['pbench', 'script']] = 'fio'",
),
(
["user.d.f:1"],
"dataset_metadata.key = 'user' AND dataset_metadata.value[['d', "
"'f']] = '1' AND dataset_metadata.user_id = '3'",
"dataset_metadata_4.value[['d', 'f']] = '1'",
),
(
["dataset.name:~fio", "^global.x:1", "^user.y:~yes"],
"(datasets.name LIKE '%' || 'fio' || '%') AND "
"(dataset_metadata.key = 'global' AND "
"dataset_metadata.value[['x']] = '1' OR dataset_metadata.key "
"= 'user' AND ((dataset_metadata.value[['y']]) LIKE '%' || "
"'yes' || '%') AND dataset_metadata.user_id = '3')",
"(datasets.name LIKE '%' || 'fio' || '%') AND (dataset_metadata_3.value[['x']] = '1' OR ((dataset_metadata_4.value[['y']]) LIKE '%' || 'yes' || '%'))",
webbnh marked this conversation as resolved.
Show resolved Hide resolved
),
(
["dataset.uploaded:~2000"],
Expand All @@ -503,14 +522,62 @@ def test_filter_query(self, monkeypatch, client, filters, expected):
lambda: DRB_USER_ID,
)
prefix = (
"SELECT datasets.access, datasets.id, datasets.name, "
"datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata ON datasets.id "
"= dataset_metadata.dataset_ref WHERE "
"SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_4 ON dataset_metadata_4.dataset_ref = datasets.id AND dataset_metadata_4.key = 'user' AND dataset_metadata_4.user_id = '3' WHERE "
)
query = DatasetsList.filter_query(
filters, Database.db_session.query(Dataset).outerjoin(Metadata)
aliases, query = self.filter_setup(DRB_USER_ID)
query = DatasetsList.filter_query(filters, aliases, query)
assert (
FLATTEN.sub(
" ",
str(query.statement.compile(compile_kwargs={"literal_binds": True})),
)
== prefix + expected
)

@pytest.mark.parametrize(
"filters,expected",
[
(["dataset.name:fio"], "datasets.name = 'fio'"),
(
["dataset.metalog.pbench.script:fio"],
"dataset_metadata_1.value[['pbench', 'script']] = 'fio'",
),
(
["dataset.name:~fio", "^global.x:1"],
"(datasets.name LIKE '%' || 'fio' || '%') AND dataset_metadata_3.value[['x']] = '1'",
),
(
["dataset.uploaded:~2000"],
"(CAST(datasets.uploaded AS VARCHAR) LIKE '%' || '2000' || '%')",
),
],
)
def test_filter_query_noauth(self, monkeypatch, client, filters, expected):
"""Test generation of Metadata value filters

Use the filter_query method directly to verify SQL generation from sets
of metadata filter expressions.
"""
monkeypatch.setattr(
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
prefix = (
"SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded "
"FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' "
"LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' WHERE "
)

# SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' WHERE datasets.name = 'fio'
# SELECT datasets.access, datasets.id, datasets.name, datasets.owner_id, datasets.resource_id, datasets.uploaded FROM datasets LEFT OUTER JOIN dataset_metadata AS dataset_metadata_1 ON dataset_metadata_1.dataset_ref = datasets.id AND dataset_metadata_1.key = 'metalog' LEFT OUTER JOIN dataset_metadata AS dataset_metadata_2 ON dataset_metadata_2.dataset_ref = datasets.id AND dataset_metadata_2.key = 'server' LEFT OUTER JOIN dataset_metadata AS dataset_metadata_3 ON dataset_metadata_3.dataset_ref = datasets.id AND dataset_metadata_3.key = 'global' LEFT OUTER JOIN dataset_metadata AS dataset_metadata_4 ON dataset_metadata_4.dataset_ref = datasets.id AND dataset_metadata_4.key = 'user' AND dataset_metadata_4.user_id = '3' WHERE datasets.name = 'fio'
webbnh marked this conversation as resolved.
Show resolved Hide resolved

aliases, query = self.filter_setup(None)
query = DatasetsList.filter_query(filters, aliases, query)
assert (
FLATTEN.sub(
" ",
Expand All @@ -527,10 +594,9 @@ def test_user_no_auth(self, monkeypatch, db_session):
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
aliases, query = self.filter_setup(None)
with pytest.raises(APIAbort) as e:
DatasetsList.filter_query(
["user.foo:1"], Database.db_session.query(Dataset).outerjoin(Metadata)
)
DatasetsList.filter_query(["user.foo:1"], aliases, query)
assert e.value.http_status == HTTPStatus.UNAUTHORIZED

@pytest.mark.parametrize(
Expand All @@ -548,10 +614,9 @@ def test_filter_errors(self, monkeypatch, db_session, meta, error):
"pbench.server.api.resources.datasets_list.Auth.get_current_user_id",
lambda: None,
)
aliases, query = self.filter_setup(None)
with pytest.raises(APIAbort) as e:
DatasetsList.filter_query(
[meta], Database.db_session.query(Dataset).outerjoin(Metadata)
)
DatasetsList.filter_query([meta], aliases, query)
assert e.value.http_status == error

@pytest.mark.parametrize(
Expand Down