diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml
index 98485d9..041de38 100644
--- a/.github/workflows/python-app.yml
+++ b/.github/workflows/python-app.yml
@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v2
with:
diff --git a/.gitignore b/.gitignore
index 9c5ed95..b77136a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,7 @@
__pycache__
*.egg-info
.vscode
-.eggs
\ No newline at end of file
+.eggs
+build
+.pytest_cache
+.vscode
diff --git a/README.md b/README.md
index 2ea938d..37de3cd 100644
--- a/README.md
+++ b/README.md
@@ -40,13 +40,34 @@ cd server
mkdir -p logs
export DB_URL=postgresql+asyncpg://npg_rw:$PASS@npg_porch_db:$PORT/$DATABASE
export DB_SCHEMA='non_default'
-uvicorn main:app --host 0.0.0.0 --port 8080 --reload --log-config logging.json
+uvicorn npg.main:app --host 0.0.0.0 --port 8080 --reload --log-config logging.json
```
and open your browser at `http://localhost:8080` to see links to the docs.
The server will not start without `DB_URL` in the environment
+## Running in production
+
+When you want HTTPS, logging and all that jazz:
+
+```bash
+uvicorn main:app --workers 2 --host 0.0.0.0 --port 8080 --log-config ~/logging.json --ssl-keyfile ~/.ssh/key.pem --ssl-certfile ~/.ssh/cert.pem --ssl-ca-certs /usr/local/share/ca-certificates/institute_ca.crt
+```
+
+Consider running with nohup or similar.
+
+Some notes on arguments:
+--workers: How many pre-forks to run. Async should mean we don't need many. Directly increases memory consumption
+
+--host: 0.0.0.0 = bind to all network interfaces. Reliable but greedy in some situations
+
+--log-config: Refers to a JSON file for python logging library. An example file is found in /server/logging.json. Uvicorn provides its own logging configuration via `uvicorn.access` and `uvicorn.error`. These may behave undesirably, and can be overridden in the JSON file with an alternate config. Likewise, fastapi logs to `fastapi` if that needs filtering. For logging to files, set `use_colors = False` in the relevant handlers or shell colour settings will appear as garbage in the logs.
+
+--ssl-keyfile: A PEM format key for the server certificate
+--ssl-certfile: A PEM format certificate for signing HTTPS communications
+--ssl-ca-certs: A CRT format certificate authority file that pleases picky clients. Uvicorn does not automatically find the system certificates, or so it seems.
+
## Testing
```bash
@@ -71,11 +92,15 @@ Create a schema on a postgres server:
```bash
psql --host=npg_porch_db --port=$PORT --username=npg_admin --password -d postgres
+```
-CREATE SCHEMA npg_porch
+```sql
+CREATE SCHEMA npg_porch;
+SET search_path = npg_porch, public;
+GRANT USAGE ON SCHEMA npg_porch TO npgtest_ro, npgtest_rw;
```
-Then run a script that deploys the ORM to this schema
+The SET command ensures that the new schema is visible _for one session only_ in the `\d*` commands you might use in psql. Then run a script that deploys the ORM to this schema
```bash
DB=npg_porch
@@ -89,7 +114,6 @@ psql --host=npg_porch_db --port=$PORT --username=npg_admin --password -d $DB
Permissions must be granted to the npg_rw and npg_ro users to the newly created schema
```sql
-GRANT USAGE ON SCHEMA npg_porch TO npgtest_ro, npgtest_rw;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA npg_porch TO npgtest_rw;
GRANT SELECT ON ALL TABLES IN SCHEMA npg_porch TO npgtest_ro;
@@ -98,6 +122,4 @@ GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA npg_porch TO npgtes
Note that granting usage on sequences is required to allow autoincrement columns to work during an insert. This is a trick of newer Postgres versions.
-It may prove necessary to `GRANT` to specific named tables and sequences. Under specific circumstances the `ALL TABLES` qualifier doesn't work.
-
Until token support is implemented, a row will need to be inserted manually into the token table. Otherwise none of the event logging works.
diff --git a/docs/user_guide.md b/docs/user_guide.md
index e6ab199..73e50ba 100644
--- a/docs/user_guide.md
+++ b/docs/user_guide.md
@@ -18,7 +18,15 @@ Bash tools like `jq` and `jo` can be useful in working with the server, as all m
We have tried to make interactions with npg_porch as atomic as possible, so the data you send and the data you receive follow the same schema.
-Security is necessary in order to prevent accidental misuse of npg_porch. An authorisation token can be provided to you by the maintainers, which you will then use to enable each request. Not implemented yet!
+Security is necessary in order to prevent accidental misuse of npg_porch. An authorisation token can be provided to you by the maintainers, which you will then use to enable each request.
+
+A note on HTTPS: Client libraries like `requests`, certain GUIs and Firefox will try to verify the server certificate authority. System-administered software are already configured correctly, but other packages installed by conda or pip may need to be told how the client may verify with a client certificate e.g. contained in `/usr/share/ca-certificates/`. It may also be useful to unset `https_proxy` and `HTTPS_PROXY` in your environment.
+
+### Step 0 - get issued security tokens
+
+Access to the service is loosely controlled with authorisation tokens. You will be issued with an admin token that enables you to register pipelines, and further tokens for pipeline-specific communication. Please do not share the tokens around and use them for purposes besides the specific pipeline. This will help us to monitor pipeline reliability and quality of service. Authorisation is achieved by HTTP Bearer Token:
+
+`curl -L -H "Authorization: Bearer $TOKEN" https://$SERVER:$PORT`
### Step 1 - register your pipeline with npg_porch
@@ -38,7 +46,7 @@ You can name your pipeline however you like, but the name must be unique, and be
}
```
-`url='npg_porch_server.sanger.ac.uk/pipelines'; curl -L -XPOST ${url} -H "content-type: application/json" -w " %{http_code}" -d @pipeline-def.json`
+`url='https://$SERVER:$PORT/pipelines'; curl -L -XPOST ${url} -H "content-type: application/json" -H "Authorization: Bearer $ADMIN_TOKEN" -w " %{http_code}" -d @pipeline-def.json`
Keep this pipeline definition with your data, as you will need it to tell npg_porch which pipeline you are acting on.
@@ -110,9 +118,9 @@ Note that it is possible to run the same `task_input` with a different `pipeline
Now you want the pipeline to run once per specification, and so register the documents with npg_porch.
```bash
-url='npg_porch_server.sanger.ac.uk/tasks'
+url='https://$SERVER:$PORT/tasks'
for DOC in *.json; do
- response=$(curl -w '%{http_code}' -L -XPOST ${url} -H "content-type: application/json" -d @${DOC}`)
+ response=$(curl -w '%{http_code}' -L -XPOST ${url} -H "content-type: application/json" -H "Authorization: Bearer $TOKEN" -d @${DOC}`)
# parsing the response is left as an exercise for the reader...
if [[ "$response_code" ne 201]]; then
@@ -128,7 +136,7 @@ use HTTP::Request;
use LWP::UserAgent;
my $ua = LWP::UserAgent->new;
-my $request = HTTP::Request->new(POST => 'npg_porch_server.sanger.ac.uk/tasks');
+my $request = HTTP::Request->new(POST => 'https://$SERVER:$PORT/tasks');
$request->content_type('application/json');
$request->header(Accept => 'application/json');
$request->content($DOC);
@@ -180,8 +188,8 @@ Supposing there are new tasks created every 24 hours, we then also need a client
Using the "claim" interface, you can ask npg_porch to earmark tasks that you intend to run. Others will remain unclaimed until this script or another claims them. Generally speaking, tasks are first-in first-out, so the first task you get if you claim one is the first unclaimed task npg_porch was told about.
```bash
-url='npg_porch_server.sanger.ac.uk/tasks/claim'
-response=$(curl -L -I -XPOST ${url} -H "content-type: application/json" -d @pipeline-def.json)
+url='https://$SERVER:$PORT/tasks/claim'
+response=$(curl -L -I -XPOST ${url} -H "content-type: application/json" -H "Authorization: Bearer $TOKEN" -d @pipeline-def.json)
```
Response body:
@@ -216,9 +224,10 @@ or
use JSON qw/decode_json/;
my $ua = LWP::UserAgent->new;
-my $request = HTTP::Request->new(POST => 'npg_porch_server.sanger.ac.uk/tasks/claim');
+my $request = HTTP::Request->new(POST => 'https://$SERVER:$PORT/tasks/claim');
$request->content_type('application/json');
$request->header(Accept => 'application/json');
+$request->header(Authorization => "Bearer $TOKEN")
my $response = $ua->request($request);
if ($response->is_success) {
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..933ec91
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,33 @@
+[build-system]
+requires = ["setuptools>=61"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "npg_porch"
+requires-python = ">=3.10"
+authors = [{name="Marina Gourtovaia", email="mg8@sanger.ac.uk"}, {name="Kieron Taylor", email="kt19@sanger.ac.uk"}]
+description = "API server for tracking unique workflow executions"
+readme = "README.md"
+license = {file = "LICENSE.md"}
+dependencies = [
+ "aiosqlite",
+ "asyncpg",
+ "fastapi",
+ "pydantic > 2.0.0",
+ "pysqlite3",
+ "psycopg2-binary",
+ "sqlalchemy >2",
+ "ujson",
+ "uvicorn",
+ "uuid"
+]
+dynamic = ["version"]
+
+[project.optional-dependencies]
+test = [
+ "pytest",
+ "pytest-asyncio",
+ "requests",
+ "flake8",
+ "httpx"
+]
diff --git a/server/deploy_schema.py b/scripts/deploy_schema.py
similarity index 65%
rename from server/deploy_schema.py
rename to scripts/deploy_schema.py
index 53d8eed..9f56d71 100644
--- a/server/deploy_schema.py
+++ b/scripts/deploy_schema.py
@@ -10,7 +10,12 @@
if schema_name is None:
schema_name = 'npg_porch'
-engine = sqlalchemy.create_engine(db_url)
+print(f'Deploying npg_porch tables to schema {schema_name}')
+
+engine = sqlalchemy.create_engine(
+ db_url,
+ connect_args={'options': f'-csearch_path={schema_name}'}
+)
npg.porchdb.models.Base.metadata.schema = schema_name
npg.porchdb.models.Base.metadata.create_all(engine)
diff --git a/scripts/issue_token.py b/scripts/issue_token.py
new file mode 100755
index 0000000..b31e4db
--- /dev/null
+++ b/scripts/issue_token.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+
+import argparse
+from sqlalchemy import create_engine, select
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.orm.exc import NoResultFound
+
+from npg.porchdb.models import Token, Pipeline
+
+parser = argparse.ArgumentParser(
+ description='Creates a token in the backend DB and returns it'
+)
+
+parser.add_argument(
+ '-H', '--host', help='Postgres host', required=True
+)
+parser.add_argument(
+ '-d', '--database', help='Postgres database', default='npg_porch'
+)
+parser.add_argument(
+ '-s', '--schema', help='Postgres schema', default='npg_porch'
+)
+parser.add_argument(
+ '-u', '--user', help='Postgres rw user', required=True
+)
+parser.add_argument(
+ '-p', '--password', help='Postgres rw password', required=True
+)
+parser.add_argument(
+ '-P', '--port', help='Postgres port', required=True
+)
+parser.add_argument(
+ '-n', '--pipeline', help='Pipeline name. If given, create '
+)
+parser.add_argument(
+ '-D', '--description', help='Description of token purpose', required=True
+)
+
+args = parser.parse_args()
+
+
+db_url = f'postgresql+psycopg2://{args.user}:{args.password}@{args.host}:{args.port}/{args.database}'
+
+engine = create_engine(db_url, connect_args={'options': f'-csearch_path={args.schema}'})
+SessionFactory = sessionmaker(bind=engine)
+session = SessionFactory()
+
+token = None
+pipeline = None
+
+if args.pipeline:
+ try:
+ pipeline = session.execute(
+ select(Pipeline)
+ .where(Pipeline.name == args.pipeline)
+ ).scalar_one()
+ except NoResultFound:
+ raise Exception(
+ 'Pipeline with name {} not found in database'.format(args.pipeline)
+ )
+
+ token = Token(
+ pipeline=pipeline,
+ description=args.description
+ )
+else:
+ token = Token(description=args.description)
+
+session.add(token)
+session.commit()
+
+print(token.token)
+
+session.close()
+engine.dispose()
diff --git a/server/tests/__init__.py b/server/__init__.py
similarity index 100%
rename from server/tests/__init__.py
rename to server/__init__.py
diff --git a/server/main.py b/server/npg/main.py
similarity index 100%
rename from server/main.py
rename to server/npg/main.py
diff --git a/server/tests/fixtures/__init__.py b/server/npg/porch/auth/__init__.py
similarity index 100%
rename from server/tests/fixtures/__init__.py
rename to server/npg/porch/auth/__init__.py
diff --git a/server/npg/porch/auth/token.py b/server/npg/porch/auth/token.py
new file mode 100644
index 0000000..49cf89c
--- /dev/null
+++ b/server/npg/porch/auth/token.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2022 Genome Research Ltd.
+#
+# Author: Kieron Taylor kt19@sanger.ac.uk
+# Author: Marina Gourtovaia mg8@sanger.ac.uk
+#
+# This file is part of npg_porch
+#
+# npg_porch is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+import logging
+from fastapi import Depends
+from fastapi.security import HTTPBearer
+from fastapi import HTTPException
+
+from npg.porchdb.connection import get_CredentialsValidator
+from npg.porchdb.auth import CredentialsValidationException
+
+auth_scheme = HTTPBearer()
+
+async def validate(
+ creds = Depends(auth_scheme),
+ validator = Depends(get_CredentialsValidator)
+):
+
+ token = creds.credentials
+ p = None
+ try:
+ p = await validator.token2permission(token)
+ except CredentialsValidationException as e:
+ logger = logging.getLogger(__name__)
+ logger.warning(str(e))
+ raise HTTPException(status_code=403, detail="Invalid token")
+
+ return p
diff --git a/server/npg/porch/endpoints/pipelines.py b/server/npg/porch/endpoints/pipelines.py
index 426d8bf..d1afcd7 100644
--- a/server/npg/porch/endpoints/pipelines.py
+++ b/server/npg/porch/endpoints/pipelines.py
@@ -20,41 +20,61 @@
from fastapi import APIRouter, HTTPException, Depends
import logging
-from typing import List, Optional
import re
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from starlette import status
from npg.porch.models.pipeline import Pipeline
+from npg.porch.models.permission import RolesEnum
from npg.porchdb.connection import get_DbAccessor
+from npg.porch.auth.token import validate
+
router = APIRouter(
prefix="/pipelines",
- tags=["pipelines"]
+ tags=["pipelines"],
+ responses={
+ status.HTTP_403_FORBIDDEN: {"description": "Not authorised"},
+ status.HTTP_500_INTERNAL_SERVER_ERROR: {"description": "Unexpected error"}
+ }
)
+
@router.get(
"/",
- response_model=List[Pipeline],
+ response_model=list[Pipeline],
summary="Get information about all pipelines.",
- description="Get all pipelines as a list. A uri and/or version filter can be used."
+ description='''
+ Returns a list of pydantic Pipeline models.
+ A uri and/or version filter can be used.
+ A valid token issued for any pipeline is required for authorisation.'''
)
async def get_pipelines(
- uri: Optional[str] = None,
- version: Optional[str] = None,
- db_accessor=Depends(get_DbAccessor)
-) -> List[Pipeline]:
+ uri: str | None = None,
+ version: str | None = None,
+ db_accessor=Depends(get_DbAccessor),
+ permissions=Depends(validate)
+) -> list[Pipeline]:
+
return await db_accessor.get_all_pipelines(uri, version)
+
@router.get(
"/{pipeline_name}",
response_model=Pipeline,
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
summary="Get information about one pipeline.",
+ description='''
+ Returns a single pydantic Pipeline model if found.
+ A valid token issued for any pipeline is required for authorisation.'''
)
-async def get_pipeline(pipeline_name: str,
- db_accessor=Depends(get_DbAccessor)):
+async def get_pipeline(
+ pipeline_name: str,
+ db_accessor=Depends(get_DbAccessor),
+ permissions=Depends(validate)
+) -> Pipeline:
+
pipeline = None
try:
pipeline = await db_accessor.get_pipeline_by_name(name=pipeline_name)
@@ -63,18 +83,31 @@ async def get_pipeline(pipeline_name: str,
detail=f"Pipeline '{pipeline_name}' not found")
return pipeline
+
@router.post(
"/",
response_model=Pipeline,
- summary="Create one pipeline record.",
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_201_CREATED: {"description": "Pipeline was created"},
status.HTTP_400_BAD_REQUEST: {"description": "Insufficient pipeline properties provided"},
status.HTTP_409_CONFLICT: {"description": "Pipeline already exists"}
- }
+ },
+ summary="Create one pipeline record.",
+ description='''
+ Using JSON data in the request, creates a new pipeline record.
+ A valid special power user token is required for authorisation.'''
)
-async def create_pipeline(pipeline: Pipeline, db_accessor=Depends(get_DbAccessor)) -> Pipeline:
+async def create_pipeline(
+ pipeline: Pipeline,
+ db_accessor=Depends(get_DbAccessor),
+ permissions=Depends(validate)
+) -> Pipeline:
+
+ if permissions.role != RolesEnum.POWER_USER:
+ logging.error(f"Role {RolesEnum.POWER_USER} is required")
+ raise HTTPException(status_code=403)
+
new_pipeline = None
try:
new_pipeline = await db_accessor.create_pipeline(pipeline)
diff --git a/server/npg/porch/endpoints/tasks.py b/server/npg/porch/endpoints/tasks.py
index 208a5d2..3a57d2a 100644
--- a/server/npg/porch/endpoints/tasks.py
+++ b/server/npg/porch/endpoints/tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021, 2022 Genome Research Ltd.
+# Copyright (C) 2021, 2022, 2023 Genome Research Ltd.
#
# Author: Kieron Taylor kt19@sanger.ac.uk
# Author: Marina Gourtovaia mg8@sanger.ac.uk
@@ -18,66 +18,94 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
-from fastapi import APIRouter, HTTPException, Depends
-from pydantic import PositiveInt
-from typing import List
+import logging
+from typing import Annotated
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from npg.porch.auth.token import validate
+from npg.porch.models.permission import PermissionValidationException
+from npg.porch.models.pipeline import Pipeline
+from npg.porch.models.task import Task, TaskStateEnum
+from npg.porchdb.connection import get_DbAccessor
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from starlette import status
-from npg.porch.models.pipeline import Pipeline
-from npg.porch.models.task import Task
-from npg.porchdb.connection import get_DbAccessor
+def _validate_request(permission, pipeline):
+
+ try:
+ permission.validate_pipeline(pipeline)
+ except PermissionValidationException as e:
+ logger = logging.getLogger(__name__)
+ logger.warning(str(e))
+ raise HTTPException(
+ status_code=403,
+ detail=("Given credentials cannot be used for"
+ f" pipeline '{pipeline.name}'")
+ )
+
+ pass
+
router = APIRouter(
prefix="/tasks",
- tags=["tasks"]
+ tags=["tasks"],
+ responses={
+ status.HTTP_403_FORBIDDEN: {"description": "Not authorised"},
+ status.HTTP_500_INTERNAL_SERVER_ERROR: {"description": "Unexpected error"}
+ }
)
+
@router.get(
"/",
- response_model=List[Task],
- summary="Returns all tasks.",
- description="Return all tasks. A filter will be applied if used in the query."
+ response_model=list[Task],
+ summary="Returns all tasks, and can be filtered to task status or pipeline name",
+ description='''
+ Return all tasks. The list of tasks can be filtered by supplying a pipeline
+ name and/or task status'''
)
-async def get_tasks(db_accessor=Depends(get_DbAccessor)):
- return await db_accessor.get_tasks()
-
-#@router.get(
-# "/{task_name}",
-# response_model=Task,
-# summary="Get one task.",
-# description="Get one task using its unique name."
-#)
-#def get_task(task_name: str):
-# return Task(name=task_name)
+async def get_tasks(
+ pipeline_name: str | None = None,
+ status: TaskStateEnum | None = None,
+ db_accessor=Depends(get_DbAccessor),
+ permission=Depends(validate)
+) -> list[Task]:
+ print(pipeline_name, status)
+ return await db_accessor.get_tasks(pipeline_name=pipeline_name, task_status=status)
+
@router.post(
"/",
response_model=Task,
- summary="Creates one task.",
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_201_CREATED: {"description": "Task creation was successful"},
- status.HTTP_404_NOT_FOUND: {"description": "The pipeline for this task is invalid"},
status.HTTP_409_CONFLICT: {"description": "A task with the same signature already exists"}
- }
-)
-async def create_task(task: Task, db_accessor=Depends(get_DbAccessor)):
- """
+ },
+ summary="Creates one task record.",
+ description='''
Given a Task object, creates a database record for it and returns
- the same object with status 201 'Created'
+ the same object, the response HTTP status is 201 'Created'. The
+ new task is assigned pending status, ie becomes available for claiming.
The pipeline specified by the `pipeline` attribute of the Task object
- should exist. If it does not exist, return status 404 'Not found' and
- an error.
+ should exist. If it does not exist, return status 404 'Not found'.'''
+)
+async def create_task(
+ task: Task,
+ db_accessor=Depends(get_DbAccessor),
+ permission=Depends(validate)
+) -> Task:
- Errors if task status is not PENDING.
- """
+ _validate_request(permission, task.pipeline)
created_task = None
try:
- created_task = await db_accessor.create_task(token_id=1, task=task)
+ created_task = await db_accessor.create_task(
+ token_id=permission.requestor_id,
+ task=task
+ )
except IntegrityError:
raise HTTPException(
status_code=409,
@@ -85,75 +113,75 @@ async def create_task(task: Task, db_accessor=Depends(get_DbAccessor)):
)
except NoResultFound:
raise HTTPException(status_code=404, detail='Failed to find pipeline for this task')
+
return created_task
+
@router.put(
"/",
response_model=Task,
- summary="Update one task.",
responses={
status.HTTP_200_OK: {"description": "Task was modified"},
- status.HTTP_404_NOT_FOUND: {
- "description": "The pipeline or task in the request is invalid"
- },
- }
+ },
+ summary="Update one task.",
+ description='''
+ Given a Task object, updates the status of the task in the database
+ to the value of the status in this Task object.
+
+ If the task does not exist, status 404 'Not found' is returned.'''
)
-async def update_task(task: Task, db_accessor=Depends(get_DbAccessor)):
- """
- Given a Task object, updates the status of the task in the database.
+async def update_task(
+ task: Task,
+ db_accessor=Depends(get_DbAccessor),
+ permission=Depends(validate)
+) -> Task:
- The pipeline specified by the `pipeline` attribute of the Task object
- should exist. If it does not exist, return status 404 'Not found' and
- an error.
- """
+ _validate_request(permission, task.pipeline)
changed_task = None
try:
- changed_task = await db_accessor.update_task(token_id=1, task=task)
+ changed_task = await db_accessor.update_task(
+ token_id=permission.requestor_id,
+ task=task
+ )
except NoResultFound as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
+
return changed_task
+
@router.post(
"/claim",
- response_model=List[Task],
- summary="Claim tasks.",
- description="Claim tasks for a particular pipeline.",
+ response_model=list[Task],
responses={
- status.HTTP_200_OK: {"description": "Receive a list of tasks that have been claimed"},
- status.HTTP_404_NOT_FOUND: {
- "description": "Cannot find the pipeline submitted with the claim"
- }
- }
-)
-async def claim_task(
- pipeline: Pipeline,
- num_tasks: PositiveInt = 1,
- db_accessor=Depends(get_DbAccessor)
-) -> List[Task]:
- """
+ status.HTTP_200_OK: {"description": "Receive a list of tasks that have been claimed"}
+ },
+ summary="Claim tasks for a particular pipeline.",
+ description='''
Arguments - the Pipeline object and the maximum number of tasks
to retrieve and claim, the latter defaults to 1 if not given.
- Return an error and status 404 'Not Found' if the pipeline with the
- given name does not exist.
-
- It is possible that no tasks that satisfy the given criteria and
- are unclaimed are found. Return status 200 and an empty array.
+ If no tasks that satisfy the given criteria and are unclaimed
+ are found, returns status 200 and an empty array.
- If any tasks are claimed, return an array of these Task objects and
- status 200.
+ If any tasks are claimed, return an array of these Task objects
+ and status 200.
- The pipeline object returned within the Task should be consistent
- with the pipeline object in the payload, but, typically, will have
- more attributes defined (uri, the specific version).
- """
-
- try:
- tasks = await db_accessor.claim_tasks(
- token_id=1,
- pipeline=pipeline,
- claim_limit=num_tasks
- )
- return tasks
- except NoResultFound as e:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=e.value)
+ The pipeline object returned within each of the tasks is consistent
+ with the pipeline object in the payload, but has all possible
+ attributes defined (uri, version).'''
+)
+async def claim_task(
+ pipeline: Pipeline,
+ num_tasks: Annotated[int | None, Query(gt=0)] = 1,
+ db_accessor=Depends(get_DbAccessor),
+ permission=Depends(validate)
+) -> list[Task]:
+
+ _validate_request(permission, pipeline)
+ tasks = await db_accessor.claim_tasks(
+ token_id=permission.requestor_id,
+ pipeline=pipeline,
+ claim_limit=num_tasks
+ )
+
+ return tasks
diff --git a/server/npg/porch/models/permission.py b/server/npg/porch/models/permission.py
index f4d1cd9..df5b62c 100644
--- a/server/npg/porch/models/permission.py
+++ b/server/npg/porch/models/permission.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021, 2022 Genome Research Ltd.
+# Copyright (C) 2022. 2023 Genome Research Ltd.
#
# Author: Kieron Taylor kt19@sanger.ac.uk
# Author: Marina Gourtovaia mg8@sanger.ac.uk
@@ -19,16 +19,23 @@
# this program. If not, see .
from enum import Enum
-from pydantic import BaseModel, Field, validator
+from pydantic import BaseModel, Field, field_validator, FieldValidationInfo
from typing import Optional
from npg.porch.models.pipeline import Pipeline
+
+class PermissionValidationException(Exception):
+ pass
+
+
class RolesEnum(str, Enum):
POWER_USER = 'power_user'
REGULAR_USER = 'regular_user'
+
class Permission(BaseModel):
+
pipeline: Optional[Pipeline] = Field(
None,
title = 'An optional pipeline object',
@@ -42,9 +49,27 @@ class Permission(BaseModel):
title = 'A role associated with the presented credentials',
)
- @validator('role')
- def no_pipeline4special_users(cls, v, values):
+ @field_validator('role')
+ @classmethod
+ def no_pipeline4special_users(cls, v: str, info: FieldValidationInfo):
+
if (v == RolesEnum.POWER_USER
- and ('pipeline' in values and values['pipeline'] is not None)):
+ and ('pipeline' in info.data and info.data['pipeline'] is not None)):
raise ValueError('Power user cannot be associated with a pipeline')
+
return v
+
+ def validate_pipeline(self, pipeline: Pipeline):
+
+ if self.role != RolesEnum.REGULAR_USER:
+ raise PermissionValidationException(
+ f"Operation is not valid for role {self.role}")
+ if not self.pipeline:
+ raise PermissionValidationException("No associated pipeline object")
+
+ if pipeline.name != self.pipeline.name:
+ raise PermissionValidationException(
+ "Token-request pipeline mismatch: "
+ + f"'{self.pipeline.name}' and '{pipeline.name}'")
+
+ pass
diff --git a/server/npg/porch/models/pipeline.py b/server/npg/porch/models/pipeline.py
index 2ffa1f1..ce212b8 100644
--- a/server/npg/porch/models/pipeline.py
+++ b/server/npg/porch/models/pipeline.py
@@ -19,7 +19,6 @@
# this program. If not, see .
from pydantic import BaseModel, Field
-from typing import Optional
class Pipeline(BaseModel):
name: str = Field(
@@ -27,12 +26,12 @@ class Pipeline(BaseModel):
title='Pipeline Name',
description='A user-controlled name for the pipeline'
)
- uri: Optional[str] = Field(
+ uri: str | None = Field(
default = None,
title='URI',
description='URI to bootstrap the pipeline code'
)
- version: Optional[str] = Field(
+ version: str | None = Field(
default = None,
title='Version',
description='Pipeline version to use with URI'
diff --git a/server/npg/porch/models/task.py b/server/npg/porch/models/task.py
index 42bd167..788518d 100644
--- a/server/npg/porch/models/task.py
+++ b/server/npg/porch/models/task.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021, 2022 Genome Research Ltd.
+# Copyright (C) 2021, 2022, 2023 Genome Research Ltd.
#
# Author: Kieron Taylor kt19@sanger.ac.uk
# Author: Marina Gourtovaia mg8@sanger.ac.uk
@@ -22,7 +22,6 @@
import hashlib
import ujson
from pydantic import BaseModel, Field
-from typing import Optional, Dict
from npg.porch.models.pipeline import Pipeline
@@ -36,17 +35,17 @@ class TaskStateEnum(str, Enum):
class Task(BaseModel):
pipeline: Pipeline
- task_input_id: Optional[str] = Field(
+ task_input_id: str | None = Field(
None,
title='Task Input ID',
description='A stringified unique identifier for a piece of work. Set by the npg_porch server, not the client' # noqa: E501
)
- task_input: Dict = Field(
+ task_input: dict = Field(
None,
title='Task Input',
description='A structured parameter set that uniquely identifies a piece of work, and enables an iteration of a pipeline' # noqa: E501
)
- status: Optional[TaskStateEnum]
+ status: TaskStateEnum | None = None
def generate_task_id(self):
return hashlib.sha256(ujson.dumps(self.task_input, sort_keys=True).encode()).hexdigest()
@@ -65,13 +64,13 @@ def __eq__(self, other):
'''
if not isinstance(other, Task):
if isinstance(other, dict):
- other = Task.parse_obj(other)
+ other = Task.model_validate(other)
else:
return False
truths = []
- for k, v in self.dict().items():
- other_d = other.dict()
+ for k, v in self.model_dump().items():
+ other_d = other.model_dump()
if k == 'pipeline':
truths.append(v['name'] == other_d[k]['name'])
elif k == 'task_input_id':
diff --git a/server/npg/porchdb/auth.py b/server/npg/porchdb/auth.py
new file mode 100644
index 0000000..3d30ef5
--- /dev/null
+++ b/server/npg/porchdb/auth.py
@@ -0,0 +1,92 @@
+# Copyright (C) 2022 Genome Research Ltd.
+#
+# Author: Kieron Taylor kt19@sanger.ac.uk
+# Author: Marina Gourtovaia mg8@sanger.ac.uk
+#
+# This file is part of npg_porch
+#
+# npg_porch is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+import re
+from sqlalchemy import select
+from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm.exc import NoResultFound
+
+from npg.porchdb.models import Token
+from npg.porch.models.permission import Permission, RolesEnum
+
+__AUTH_TOKEN_LENGTH__ = 32
+__AUTH_TOKEN_REGEXP__ = re.compile(
+ r'\A[0-9A-F]+\Z', flags = re.ASCII | re.IGNORECASE)
+
+class CredentialsValidationException(Exception):
+ pass
+
+
+class Validator:
+ '''
+ A validator for credentials presented by the requestor.
+
+ Instantiate with a sqlalchemy AsyncSession
+ '''
+
+ def __init__(self, session):
+ self.session = session
+
+ async def token2permission(self, token: str):
+
+ if len(token) != __AUTH_TOKEN_LENGTH__:
+ raise CredentialsValidationException(
+ f"The token should be {__AUTH_TOKEN_LENGTH__} chars long"
+ )
+ elif __AUTH_TOKEN_REGEXP__.match(token) is None:
+ raise CredentialsValidationException(
+ 'Token failed character validation'
+ )
+
+ valid_token_row = None
+ try:
+ # Using 'outerjoin' to get the left join for token, pipeline.
+ # We need to retrieve all token rows, regardless of whether
+ # they are linked the pipeline table or not (we are using a
+ # nullable foreign key to allow for no link).
+ result = await self.session.execute(
+ select(Token)
+ .filter_by(token=token)
+ .outerjoin(Token.pipeline)
+ .options(contains_eager(Token.pipeline))
+ )
+ valid_token_row = result.scalar_one()
+ except NoResultFound:
+ raise CredentialsValidationException('An unknown token is used')
+
+ if (valid_token_row is not None) and (valid_token_row.date_revoked is not None):
+ raise CredentialsValidationException('A revoked token is used')
+
+ permission = None
+ pipeline = valid_token_row.pipeline
+ token_id = valid_token_row.token_id
+ if pipeline is None:
+ permission = Permission(
+ role = RolesEnum.POWER_USER,
+ requestor_id = token_id
+ )
+ else:
+ permission = Permission(
+ role = RolesEnum.REGULAR_USER,
+ requestor_id = token_id,
+ pipeline = pipeline.convert_to_model()
+ )
+
+ return permission
diff --git a/server/npg/porchdb/connection.py b/server/npg/porchdb/connection.py
index 1bc559b..1374db3 100644
--- a/server/npg/porchdb/connection.py
+++ b/server/npg/porchdb/connection.py
@@ -24,6 +24,7 @@
from npg.porchdb.models import Base
from npg.porchdb.data_access import AsyncDbAccessor
+from npg.porchdb.auth import Validator
config = {
'DB_URL': os.environ.get('DB_URL'),
@@ -36,10 +37,24 @@
# config['DB_URL'] = 'sqlite+aiosqlite:///test.db'
if config['DB_URL'] is None or config['DB_URL'] == '':
- raise Exception("ENV['DB_URL'] must be set with a database URL")
+ raise Exception(
+ "ENV['DB_URL'] must be set with a database URL, or NPG_PORCH_MODE must be set for testing"
+ )
+
+# asyncpg driver receives options differently to psycopg
+# Embed them inside a server_settings dict
+connect_args = {}
+if config['TEST'] is None:
+ connect_args = {
+ 'server_settings': {
+ 'options': '-csearch_path={}'.format(config["DB_SCHEMA"])
+ }
+ }
engine = create_async_engine(
- config['DB_URL'], future=True
+ config['DB_URL'],
+ connect_args=connect_args,
+ pool_pre_ping=True
)
Base.metadata.schema = config['DB_SCHEMA']
session_factory = sessionmaker(
@@ -47,13 +62,28 @@
)
async def get_DbAccessor():
- 'Provides a hook for fastapi to Depend on a DB session in each route'
+ '''
+ Provides a hook for fastapi to Depend on a DB session in each route.
+
+ Yields an instance of AsyncDbAccessor class, which provides an API
+ for access to data.
+
+ Starts a transaction that finished automatically when the returned
+ object drops out of scope.
+ '''
async with session_factory() as session:
- # Starting a transaction that finished automatically when the returned
- # object drops out of scope
async with session.begin():
yield AsyncDbAccessor(session)
+async def get_CredentialsValidator():
+ '''
+ Similar to get_DbAccessor, but yields an instance of the Validator class,
+ which provides methods for validating credentials submitted with the
+ request.
+ '''
+ async with session_factory() as session:
+ async with session.begin():
+ yield Validator(session)
async def deploy_schema():
async with engine.begin() as conn:
diff --git a/server/npg/porchdb/data_access.py b/server/npg/porchdb/data_access.py
index 49bb7b3..effd0e4 100644
--- a/server/npg/porchdb/data_access.py
+++ b/server/npg/porchdb/data_access.py
@@ -23,7 +23,6 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import contains_eager, joinedload
from sqlalchemy.orm.exc import NoResultFound
-from typing import Optional, List
from npg.porchdb.models import Pipeline as DbPipeline, Task as DbTask, Event
from npg.porch.models import Task, Pipeline, TaskStateEnum
@@ -55,10 +54,10 @@ async def _get_pipeline_db_object(self, name: str) -> Pipeline:
async def _get_pipeline_db_objects(
self,
- name: Optional[str] = None,
- version: Optional[str] = None,
- uri: Optional[str] = None
- ) -> List[Pipeline]:
+ name: str | None = None,
+ version: str | None = None,
+ uri: str | None = None
+ ) -> list[Pipeline]:
query = select(DbPipeline)
if name:
query = query.filter_by(name=name)
@@ -72,9 +71,9 @@ async def _get_pipeline_db_objects(
async def get_all_pipelines(
self,
- uri: Optional[str] = None,
- version: Optional[str] = None
- ) -> List[Pipeline]:
+ uri: str | None = None,
+ version: str | None = None
+ ) -> list[Pipeline]:
pipelines = []
pipelines = await self._get_pipeline_db_objects(uri=uri, version=version)
return [pipe.convert_to_model() for pipe in pipelines]
@@ -117,8 +116,8 @@ async def create_task(self, token_id: int, task: Task) -> Task:
return t.convert_to_model()
async def claim_tasks(
- self, token_id: int, pipeline: Pipeline, claim_limit: Optional[int] = 1
- ) -> List[Task]:
+ self, token_id: int, pipeline: Pipeline, claim_limit: int | None = 1
+ ) -> list[Task]:
session = self.session
try:
@@ -194,14 +193,27 @@ async def update_task(self, token_id: int, task: Task) -> Task:
return og_task.convert_to_model()
- async def get_tasks(self) -> List[Task]:
+ async def get_tasks(
+ self,
+ pipeline_name: str | None = None,
+ task_status: TaskStateEnum | None = None
+ ) -> list[Task]:
'''
- Gets all the tasks. Going to be problematic without filtering
+ Gets all the tasks.
+
+ Can filter tasks by pipeline name and task status in order to be more useful.
'''
- task_result = await self.session.execute(
- select(DbTask)
+ query = select(DbTask)\
+ .join(DbTask.pipeline)\
.options(joinedload(DbTask.pipeline))
- )
+
+ if pipeline_name:
+ query = query.where(DbPipeline.name == pipeline_name)
+
+ if task_status:
+ query = query.filter(DbTask.state == task_status)
+
+ task_result = await self.session.execute(query)
tasks = task_result.scalars().all()
return [t.convert_to_model() for t in tasks]
@@ -216,7 +228,7 @@ def convert_task_to_db(task: Task, pipeline: DbPipeline) -> DbTask:
state=task.status
)
- async def get_events_for_task(self, task: Task) -> List[Event]:
+ async def get_events_for_task(self, task: Task) -> list[Event]:
events = await self.session.execute(
select(Event)
.join(Event.task)
diff --git a/server/npg/porchdb/models/base.py b/server/npg/porchdb/models/base.py
index 7d796e0..db40e83 100644
--- a/server/npg/porchdb/models/base.py
+++ b/server/npg/porchdb/models/base.py
@@ -18,6 +18,6 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
-from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import declarative_base
Base = declarative_base()
diff --git a/server/npg/version.py b/server/npg/version.py
deleted file mode 100644
index b794fd4..0000000
--- a/server/npg/version.py
+++ /dev/null
@@ -1 +0,0 @@
-__version__ = '0.1.0'
diff --git a/server/tests/model_permission_test.py b/server/tests/model_permission_test.py
deleted file mode 100644
index ff103e5..0000000
--- a/server/tests/model_permission_test.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import pytest
-
-from npg.porch.models.pipeline import Pipeline
-from npg.porch.models.permission import Permission
-from pydantic.error_wrappers import ValidationError
-
-def test_model_create():
- ''''
- Test objects can be created.
- '''
-
- p = Permission(requestor_id = 3, role = 'power_user')
- assert type(p) is Permission
- p = Permission(
- requestor_id = 1,
- role = 'regular_user',
- pipeline = Pipeline(name='number one')
- )
- assert type(p) is Permission
-
-def test_xvalidation_role_pipeline():
- '''
- Test cross validation for the role and pipeline fields.
- '''
-
- with pytest.raises(
- ValidationError,
- match = r'Power user cannot be associated with a pipeline'):
- Permission(
- requestor_id = 3,
- role = 'power_user',
- pipeline = Pipeline(name='number one')
- )
-
-def test_error_with_insufficient_args():
-
- with pytest.raises(ValidationError, match=r'requestor_id\s+field required'):
- Permission(
- role = 'regular_user',
- pipeline = Pipeline(name='number one')
- )
- with pytest.raises(ValidationError, match=r'role\s+field required'):
- Permission(
- requestor_id = 1,
- pipeline = Pipeline(name='number one')
- )
diff --git a/server/tests/task_route_test.py b/server/tests/task_route_test.py
deleted file mode 100644
index cb85f85..0000000
--- a/server/tests/task_route_test.py
+++ /dev/null
@@ -1,133 +0,0 @@
-from starlette import status
-
-from npg.porch.models import Task, TaskStateEnum
-
-# Not testing get-all-tasks as this method will ultimately go
-
-def test_task_creation(async_minimum, fastapi_testclient):
- # Create a task with a sparse pipeline definition
- task_one = Task(
- pipeline = {
- 'name': 'ptest one'
- },
- task_input = {
- 'number': 1
- }
- )
-
- response = fastapi_testclient.post(
- 'tasks',
- json=task_one.dict(),
- allow_redirects=True
- )
-
- assert response.status_code == status.HTTP_201_CREATED
- assert task_one == response.json()
-
- # Try again and expect to fail
- response = fastapi_testclient.post(
- 'tasks',
- json=task_one.dict(),
- allow_redirects=True
- )
-
- assert response.status_code == status.HTTP_409_CONFLICT
-
- task_two = Task(
- pipeline = {
- 'name': 'ptest none'
- },
- task_input = {
- 'number': 1
- }
- )
- response = fastapi_testclient.post(
- 'tasks',
- json=task_two.dict(),
- allow_redirects=True
- )
- assert response.status_code == status.HTTP_404_NOT_FOUND
-
-def test_task_update(async_minimum, fastapi_testclient):
- task = fastapi_testclient.get('/tasks').json()[0]
-
- assert task['status'] is None
- task['status'] = TaskStateEnum.PENDING
-
- response = fastapi_testclient.put(
- '/tasks',
- json=task,
- allow_redirects=True
- )
- assert response.status_code == status.HTTP_200_OK
- modified_task = Task.parse_obj(response.json())
-
- assert modified_task == task
-
- # Now invalidate the task by changing the signature
- modified_task.task_input = {
- 'something': 'different'
- }
-
- response = fastapi_testclient.put(
- '/tasks',
- json=modified_task.dict(),
- allow_redirects=True
- )
- assert response.status_code == status.HTTP_404_NOT_FOUND
- assert response.json() == {'detail': 'Task to be modified could not be found'}
-
- # And change the reference pipeline to something wrong
- modified_task.pipeline = {
- 'name': 'ptest one thousand'
- }
-
- response = fastapi_testclient.put(
- '/tasks',
- json=modified_task.dict(),
- allow_redirects=True
- )
- assert response.status_code == status.HTTP_404_NOT_FOUND
- assert response.json() == {'detail': 'Pipeline not found'}
-
-def test_task_claim(async_tasks, fastapi_testclient):
- response = fastapi_testclient.get('/pipelines/ptest one')
-
- assert response.status_code == status.HTTP_200_OK
- pipeline = response.json()
-
- tasks_seen = []
-
- response = fastapi_testclient.post('/tasks/claim', json=pipeline)
- assert response.status_code == status.HTTP_200_OK
- tasks = response.json()
-
- assert len(tasks) == 1, 'Defaults to one task claimed'
- t = tasks[0]
- assert t['task_input'] == {'input': 1}
- assert t['status'] == TaskStateEnum.CLAIMED
- tasks_seen.append(t['task_input_id'])
-
- response = fastapi_testclient.post('/tasks/claim?num_tasks=0', json=pipeline)
- assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY, 'Not allowed to use invalid numbers of tasks' # noqa: E501
-
- response = fastapi_testclient.post('/tasks/claim?num_tasks=2', json=pipeline)
- assert response.status_code == status.HTTP_200_OK
- tasks = response.json()
- assert len(tasks) == 2, 'Asked for two, got two'
- tasks_seen.extend([t['task_input_id'] for t in tasks])
-
- # Cannot test race conditions, because sqlite only pretends to support full async
- # Claim the rest
- response = fastapi_testclient.post('/tasks/claim?num_tasks=8', json=pipeline)
- assert response.status_code == status.HTTP_200_OK
- tasks = response.json()
- assert len(tasks) == 7, 'Asked for eight, got seven'
- tasks_seen.extend([t['task_input_id'] for t in tasks])
-
- assert len(set(tasks_seen)) == 10, 'Ten unique tasks were claimed'
-
- response = fastapi_testclient.post('/tasks/claim', json=pipeline)
- assert response.status_code == status.HTTP_200_OK
- tasks = response.json()
- assert len(tasks) == 0, 'Tried to claim, did not get any tasks'
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..9eb1408
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,11 @@
+[metadata]
+name = npg_porch
+version = 1.0.0
+
+[options]
+package_dir =
+ =server
+packages = find:
+
+[options.packages.find]
+where=server
\ No newline at end of file
diff --git a/setup.py b/setup.py
deleted file mode 100644
index 4247863..0000000
--- a/setup.py
+++ /dev/null
@@ -1,40 +0,0 @@
-from setuptools import setup
-from distutils.util import convert_path
-
-version_path = convert_path('server/npg/version.py')
-namespace = {}
-with open(version_path) as ver_file:
- exec(ver_file.read(), namespace)
-
-setup(
- name='npg-porch',
- version=namespace['__version__'],
- package_dir={
- 'npg': './server'
- },
- packages=['npg'],
- license='GNU General Public License v3.0',
- author='Wellcome Sanger Institute',
- author_email='npg@sanger.ac.uk',
- description='Work allocation and tracking for portable pipelines',
- install_requires=[
- 'aiosqlite',
- 'asyncpg',
- 'fastapi',
- 'pydantic',
- 'pysqlite3',
- 'psycopg2-binary',
- 'sqlalchemy>=1.4.29',
- 'ujson',
- 'uvicorn',
- 'uuid'
- ],
- extras_require={
- 'test': [
- 'pytest',
- 'pytest-asyncio',
- 'requests',
- 'flake8'
- ]
- }
-)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/server/tests/conftest.py b/tests/conftest.py
similarity index 100%
rename from server/tests/conftest.py
rename to tests/conftest.py
diff --git a/server/tests/data_access_test.py b/tests/data_access_test.py
similarity index 81%
rename from server/tests/data_access_test.py
rename to tests/data_access_test.py
index 2cc0ab2..c463b0c 100644
--- a/server/tests/data_access_test.py
+++ b/tests/data_access_test.py
@@ -19,7 +19,6 @@ def give_me_a_pipeline(number: int = 1):
async def store_me_a_pipeline(dac: AsyncDbAccessor, number: int = 1) -> ModelledPipeline:
return await dac.create_pipeline(give_me_a_pipeline(number))
-@pytest.mark.asyncio
def test_data_accessor_setup(async_session):
with pytest.raises(TypeError):
dac = AsyncDbAccessor()
@@ -228,3 +227,56 @@ async def test_update_tasks(db_accessor):
with pytest.raises(Exception) as exception:
await db_accessor.update_task(1, saved_task)
assert exception.value == 'Cannot change task definition. Submit a new task instead'
+
+
+@pytest.mark.asyncio
+async def test_get_tasks(db_accessor):
+ all_tasks = await db_accessor.get_tasks()
+
+ assert len(all_tasks) == 2, 'All tasks currently includes two for one pipeline from the fixture'
+
+ tasks = await db_accessor.get_tasks(pipeline_name = 'ptest one')
+
+ assert tasks == all_tasks, 'Filtering by pipeline name gives the same result'
+
+ tasks = await db_accessor.get_tasks(task_status = TaskStateEnum.FAILED)
+ assert len(tasks) == 0, 'No failed tasks yet'
+
+ # Create an additional pipeline and tasks
+
+ pipeline = await store_me_a_pipeline(db_accessor, 2)
+
+ for i in range(3):
+ await db_accessor.create_task(
+ token_id=1,
+ task=Task(
+ task_input={'number': i + 1},
+ pipeline=pipeline
+ )
+ )
+
+ all_tasks = await db_accessor.get_tasks()
+ assert len(all_tasks) == 5, 'Now we have five tasks in two pipelines'
+
+ tasks = await db_accessor.get_tasks(pipeline_name = 'ptest one')
+ assert len(tasks) == 2, 'New tasks filtered out by pipeline name'
+ assert tasks[0].pipeline.name == 'ptest one'
+
+ # Change one task to another status
+ await db_accessor.update_task(
+ token_id = 1,
+ task=Task(
+ task_input={'number': 3},
+ pipeline=pipeline,
+ status=TaskStateEnum.DONE
+ )
+ )
+
+ tasks = await db_accessor.get_tasks(task_status = TaskStateEnum.DONE)
+ assert len(tasks) == 1, 'Not done tasks are filtered'
+ assert tasks[0].task_input == {'number': 3}, 'Leaving only one'
+
+ # Check interaction of both constraints
+
+ tasks = await db_accessor.get_tasks(pipeline_name='ptest one', task_status=TaskStateEnum.DONE)
+ assert len(tasks) == 0, 'Pipeline "ptest one" has no DONE tasks'
diff --git a/tests/db_auth_test.py b/tests/db_auth_test.py
new file mode 100644
index 0000000..e155452
--- /dev/null
+++ b/tests/db_auth_test.py
@@ -0,0 +1,90 @@
+import pytest
+import datetime
+from sqlalchemy import select
+
+from npg.porchdb.models import Token, Pipeline
+from npg.porchdb.auth import Validator, CredentialsValidationException
+import npg.porch.models.permission
+import npg.porch.models.pipeline
+
+@pytest.mark.asyncio
+async def test_token_string_is_valid(async_minimum):
+
+ v = Validator(session = async_minimum)
+ assert isinstance(v, (npg.porchdb.auth.Validator))
+
+ with pytest.raises(CredentialsValidationException,
+ match=r'The token should be 32 chars long'):
+ await v.token2permission("")
+ with pytest.raises(CredentialsValidationException,
+ match=r'The token should be 32 chars long'):
+ await v.token2permission('aaaa')
+ with pytest.raises(CredentialsValidationException,
+ match=r'The token should be 32 chars long'):
+ await v.token2permission('AAAAAAAAAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ # This token contains punctuation characters.
+ with pytest.raises(CredentialsValidationException,
+ match=r'Token failed character validation'):
+ await v.token2permission('7dc1457531e3495?9bd5:bcda579c1c6')
+ # This token contains characters beyong F.
+ with pytest.raises(CredentialsValidationException,
+ match=r'Token failed character validation'):
+ await v.token2permission('7dc1457531e3495P9bd5Kbcda579c1c6')
+
+@pytest.mark.asyncio
+async def test_token_is_known_and_valid(async_minimum):
+
+ v = Validator(session = async_minimum)
+
+ # This token does not exist.
+ with pytest.raises(CredentialsValidationException,
+ match=r'An unknown token is used'):
+ await v.token2permission('aaaaaaaBBaaa11111111111111111111')
+
+ # Mark one of the tokens as revoked.
+ result = await async_minimum.execute(
+ select(Token)
+ .filter_by(description='OpenStack host, job finder')
+ )
+ token_row = result.scalar_one()
+ token_string = token_row.token
+ token_row.date_revoked = datetime.date(2022, 1, 1)
+ async_minimum.add(token_row)
+ await async_minimum.commit()
+
+ with pytest.raises(CredentialsValidationException,
+ match=r'A revoked token is used'):
+ await v.token2permission(token_string)
+
+@pytest.mark.asyncio
+async def test_permission_object_is_returned(async_minimum):
+
+ # The fixtures have a token not associated with any pipeline.
+ # To model data realistically, create a pipeline not associated
+ # with any token.
+ async_minimum.add(Pipeline(
+ name='ptest ten',
+ repository_uri='pipeline-testten.com',
+ version='0.3.15'
+ ))
+ await async_minimum.commit()
+
+ v = Validator(session = async_minimum)
+ result = await async_minimum.execute(select(Token))
+ token_rows = result.scalars().all()
+
+ for t in token_rows:
+ if t.description == 'Seqfarm host, job runner':
+ p = await v.token2permission(t.token)
+ assert isinstance(p, (npg.porch.models.permission.Permission))
+ assert p.pipeline is not None
+ assert isinstance(p.pipeline, (npg.porch.models.pipeline.Pipeline))
+ assert p.pipeline.name == 'ptest one'
+ assert p.requestor_id == t.token_id
+ assert p.role == 'regular_user'
+ elif t.description == 'Seqfarm host, admin':
+ p = await v.token2permission(t.token)
+ assert isinstance(p, (npg.porch.models.permission.Permission))
+ assert p.pipeline is None
+ assert p.requestor_id == t.token_id
+ assert p.role == 'power_user'
diff --git a/server/tests/db_task_test.py b/tests/db_task_test.py
similarity index 100%
rename from server/tests/db_task_test.py
rename to tests/db_task_test.py
diff --git a/server/tests/db_token_test.py b/tests/db_token_test.py
similarity index 100%
rename from server/tests/db_token_test.py
rename to tests/db_token_test.py
diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/server/tests/fixtures/deploy_db.py b/tests/fixtures/deploy_db.py
similarity index 94%
rename from server/tests/fixtures/deploy_db.py
rename to tests/fixtures/deploy_db.py
index 38d02ae..1a0d966 100644
--- a/server/tests/fixtures/deploy_db.py
+++ b/tests/fixtures/deploy_db.py
@@ -8,7 +8,7 @@
)
from npg.porchdb.data_access import AsyncDbAccessor
from npg.porch.models import Task as ModelledTask, TaskStateEnum
-from main import app
+from npg.main import app
@pytest.fixture
def minimum_data():
@@ -21,6 +21,7 @@ def minimum_data():
)
tokens = [
Token(
+ token='cac0533d5599489d9a3d998028a79fe8',
pipeline=pipeline,
description='OpenStack host, job finder'
),
@@ -29,6 +30,7 @@ def minimum_data():
description='Seqfarm host, job runner'
),
Token(
+ token='4bab73544c834c6f86f9662e5de26d0d',
description='Seqfarm host, admin'
)
]
@@ -70,11 +72,12 @@ def lots_of_tasks():
'A good supply of tasks for testing claims'
pipeline = Pipeline(
- name='ptest one',
+ name='ptest some',
repository_uri='pipeline-test.com',
version='0.3.14'
)
job_finder_token = Token(
+ token='ba53eaf7073d4c2b95ca47aeed41086c',
pipeline=pipeline,
description='OpenStack host, job finder'
)
diff --git a/server/tests/fixtures/orm_session.py b/tests/fixtures/orm_session.py
similarity index 95%
rename from server/tests/fixtures/orm_session.py
rename to tests/fixtures/orm_session.py
index fb6dd10..7cdbb09 100644
--- a/server/tests/fixtures/orm_session.py
+++ b/tests/fixtures/orm_session.py
@@ -16,7 +16,7 @@ def sync_session():
'''
sqlite_url = 'sqlite+pysqlite:///:memory:'
- engine = sqlalchemy.create_engine(sqlite_url, future=True)
+ engine = sqlalchemy.create_engine(sqlite_url)
Base.metadata.create_all(engine)
SessionFactory = sqlalchemy.orm.sessionmaker(bind=engine)
sess = sqlalchemy.orm.scoped_session(SessionFactory)
diff --git a/server/tests/init_test.py b/tests/init_test.py
similarity index 100%
rename from server/tests/init_test.py
rename to tests/init_test.py
diff --git a/tests/model_permission_test.py b/tests/model_permission_test.py
new file mode 100644
index 0000000..23d3716
--- /dev/null
+++ b/tests/model_permission_test.py
@@ -0,0 +1,82 @@
+import pytest
+
+from npg.porch.models.pipeline import Pipeline
+from npg.porch.models.permission import Permission, PermissionValidationException
+from pydantic import ValidationError
+
+
+def test_model_create():
+ ''''
+ Test objects can be created.
+ '''
+
+ p = Permission(requestor_id = 3, role = 'power_user')
+ assert type(p) is Permission
+ p = Permission(
+ requestor_id = 1,
+ role = 'regular_user',
+ pipeline = Pipeline(name='number one')
+ )
+ assert type(p) is Permission
+
+
+def test_xvalidation_role_pipeline():
+ '''
+ Test cross validation for the role and pipeline fields.
+ '''
+
+ with pytest.raises(
+ ValidationError,
+ match = r'Power user cannot be associated with a pipeline'):
+ Permission(
+ requestor_id = 3,
+ role = 'power_user',
+ pipeline = Pipeline(name='number one')
+ )
+
+
+def test_error_with_insufficient_args():
+
+ with pytest.raises(ValidationError, match=r'requestor_id\s+Field required'):
+ Permission(
+ role = 'regular_user',
+ pipeline = Pipeline(name='number one')
+ )
+ with pytest.raises(ValidationError, match=r'role\s+Field required'):
+ Permission(
+ requestor_id = 1,
+ pipeline = Pipeline(name='number one')
+ )
+
+
+def test_pipeline_validation():
+
+ pipeline = Pipeline(name='number one')
+ permission = Permission(
+ requestor_id = 3,
+ role = 'power_user')
+ with pytest.raises(PermissionValidationException,
+ match=r'Operation is not valid for role power_user'):
+ permission.validate_pipeline(pipeline)
+
+ permission = Permission(
+ requestor_id = 3,
+ role = 'regular_user')
+ with pytest.raises(PermissionValidationException,
+ match=r'No associated pipeline object'):
+ permission.validate_pipeline(pipeline)
+
+ permission = Permission(
+ requestor_id = 3,
+ role = 'regular_user',
+ pipeline = Pipeline(name='number two'))
+ with pytest.raises(
+ PermissionValidationException, match
+ =r"Token-request pipeline mismatch: 'number two' and 'number one'"):
+ permission.validate_pipeline(pipeline)
+
+ permission = Permission(
+ requestor_id = 3,
+ role = 'regular_user',
+ pipeline = pipeline)
+ assert (permission.validate_pipeline(pipeline) is None), 'no error'
diff --git a/server/tests/pipeline_route_test.py b/tests/pipeline_route_test.py
similarity index 60%
rename from server/tests/pipeline_route_test.py
rename to tests/pipeline_route_test.py
index b29db0f..bf0f5d8 100644
--- a/server/tests/pipeline_route_test.py
+++ b/tests/pipeline_route_test.py
@@ -3,19 +3,50 @@
from npg.porch.models import Pipeline
+headers = {
+ 'Authorization': 'Bearer cac0533d5599489d9a3d998028a79fe8',
+ 'accept': 'application/json'
+}
+headers4power_user = {
+ 'Authorization': 'Bearer 4bab73544c834c6f86f9662e5de26d0d',
+ 'accept': 'application/json'
+}
+
+
def http_create_pipeline(fastapi_testclient, pipeline):
- response = fastapi_testclient.post('/pipelines', json=pipeline.dict(), allow_redirects=True)
+
+ response = fastapi_testclient.post(
+ '/pipelines', json=pipeline.model_dump(), follow_redirects=True
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+
+ response = fastapi_testclient.post(
+ '/pipelines', json=pipeline.model_dump(), follow_redirects=True,
+ headers=headers
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+
+ response = fastapi_testclient.post(
+ '/pipelines', json=pipeline.model_dump(), follow_redirects=True,
+ headers=headers4power_user
+ )
assert response.status_code == status.HTTP_201_CREATED
+
return response.json()
+
def test_pipeline_get(async_minimum, fastapi_testclient):
+
response = fastapi_testclient.get('/pipelines')
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+ response = fastapi_testclient.get('/pipelines', headers=headers)
assert response.status_code == status.HTTP_200_OK
- pipeline = Pipeline.parse_obj(response.json()[0])
+ pipeline = Pipeline.model_validate(response.json()[0])
assert pipeline, 'Response fits into the over-the-wire model'
assert pipeline.name == 'ptest one'
assert pipeline.version == '0.3.14'
+
def test_pipeline_filtered_get(async_minimum, fastapi_testclient):
second_pipeline = Pipeline(
@@ -33,32 +64,43 @@ def test_pipeline_filtered_get(async_minimum, fastapi_testclient):
http_create_pipeline(fastapi_testclient, second_pipeline)
http_create_pipeline(fastapi_testclient, third_pipeline)
- response = fastapi_testclient.get('/pipelines?version=0.3.14')
+ response = fastapi_testclient.get(
+ '/pipelines?version=0.3.14', headers=headers
+ )
assert response.status_code == status.HTTP_200_OK
pipes = response.json()
assert len(pipes) == 3, 'All three pipelines have the same version'
- response = fastapi_testclient.get('/pipelines?uri=http://test.com')
+ response = fastapi_testclient.get(
+ '/pipelines?uri=http://test.com', headers=headers
+ )
assert response.status_code == status.HTTP_200_OK
pipes = response.json()
assert len(pipes) == 1, 'Only one pipeline matches the uri'
- assert pipes[0] == second_pipeline
+ assert pipes[0] == second_pipeline.model_dump()
def test_get_known_pipeline(async_minimum, fastapi_testclient):
- response = fastapi_testclient.get('/pipelines/ptest one')
+
+ response = fastapi_testclient.get(
+ '/pipelines/ptest one', headers=headers
+ )
assert response.status_code == status.HTTP_200_OK
- pipeline = Pipeline.parse_obj(response.json())
+ pipeline = Pipeline.model_validate(response.json())
assert pipeline, 'Response fits into the over-the-wire model'
assert pipeline.name == 'ptest one'
assert pipeline.version == '0.3.14'
- response = fastapi_testclient.get('/pipelines/not here')
+ response = fastapi_testclient.get(
+ '/pipelines/not here', headers=headers
+ )
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()['detail'] == "Pipeline 'not here' not found"
-def test_create_pipeline(fastapi_testclient):
+
+def test_create_pipeline(async_minimum, fastapi_testclient):
+
# Create a pipeline
desired_pipeline = Pipeline(
name='ptest two',
@@ -67,16 +109,16 @@ def test_create_pipeline(fastapi_testclient):
)
response = http_create_pipeline(fastapi_testclient, desired_pipeline)
- pipeline = Pipeline.parse_obj(response)
+ pipeline = Pipeline.model_validate(response)
assert pipeline == desired_pipeline, 'Got back what we put in'
# Create the same pipeline
response = fastapi_testclient.post(
'/pipelines',
- json=desired_pipeline.dict(),
- allow_redirects=True
+ json=desired_pipeline.model_dump(),
+ follow_redirects=True,
+ headers=headers4power_user
)
-
assert response.status_code == status.HTTP_409_CONFLICT, 'ptest two already in DB'
assert response.json()['detail'] == 'Pipeline already exists'
@@ -90,12 +132,11 @@ def test_create_pipeline(fastapi_testclient):
response = http_create_pipeline(fastapi_testclient, second_desired_pipeline)
# Retrieve the same pipelines
-
response = fastapi_testclient.get(
- '/pipelines'
+ '/pipelines', headers=headers
)
assert response.status_code == status.HTTP_200_OK
- assert response.json() == [desired_pipeline, second_desired_pipeline]
+ assert response.json()[1:] == [desired_pipeline.model_dump(), second_desired_pipeline.model_dump()]
# Create a very poorly provenanced pipeline
third_desired_pipeline = Pipeline(
@@ -106,8 +147,8 @@ def test_create_pipeline(fastapi_testclient):
response = fastapi_testclient.post(
'/pipelines',
- json=third_desired_pipeline.dict(),
- allow_redirects=True
+ json=third_desired_pipeline.model_dump(),
+ follow_redirects=True,
+ headers=headers4power_user
)
-
assert response.status_code == status.HTTP_400_BAD_REQUEST
diff --git a/tests/task_route_test.py b/tests/task_route_test.py
new file mode 100644
index 0000000..c24bd6a
--- /dev/null
+++ b/tests/task_route_test.py
@@ -0,0 +1,216 @@
+from starlette import status
+
+from npg.porch.models import Task, TaskStateEnum, Pipeline
+
+# Not testing get-all-tasks as this method will ultimately go
+
+headers4ptest_one = {
+ 'Authorization': 'Bearer cac0533d5599489d9a3d998028a79fe8',
+ 'accept': 'application/json'
+}
+headers4ptest_some = {
+ 'Authorization': 'Bearer ba53eaf7073d4c2b95ca47aeed41086c',
+ 'accept': 'application/json'
+}
+
+def test_task_creation(async_minimum, fastapi_testclient):
+
+ # Create a task with a sparse pipeline definition
+ task_one = Task(
+ pipeline = {
+ 'name': 'ptest one'
+ },
+ task_input = {
+ 'number': 1
+ }
+ )
+
+ response = fastapi_testclient.post(
+ 'tasks',
+ json=task_one.model_dump(),
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_201_CREATED
+ assert task_one == response.json()
+
+ # Try again and expect to fail
+ response = fastapi_testclient.post(
+ 'tasks',
+ json=task_one.model_dump(),
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_409_CONFLICT
+
+ task_two = Task(
+ pipeline = {
+ 'name': 'ptest none'
+ },
+ task_input = {
+ 'number': 1
+ }
+ )
+ # The token is valid, but for a different pipeline. It is impossible
+ # to have a valid token for a pipeline that does not exist.
+ response = fastapi_testclient.post(
+ 'tasks',
+ json=task_two.model_dump(),
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+
+
+def test_task_update(async_minimum, fastapi_testclient):
+
+ task = fastapi_testclient.get('/tasks', headers=headers4ptest_one).json()[0]
+ assert task['status'] is None
+
+ task['status'] = TaskStateEnum.PENDING
+ response = fastapi_testclient.put(
+ '/tasks',
+ json=task,
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ modified_task = Task.model_validate(response.json())
+ assert modified_task == task
+
+ # Now invalidate the task by changing the signature
+ modified_task.task_input = {
+ 'something': 'different'
+ }
+ response = fastapi_testclient.put(
+ '/tasks',
+ json=modified_task.model_dump(),
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_404_NOT_FOUND
+ assert response.json() == {'detail': 'Task to be modified could not be found'}
+
+ # And change the reference pipeline to something wrong.
+ # This token is valid, but for a different pipeline. It is impossible
+ # to have a valid token for a pipeline that does not exist.
+ modified_task.pipeline = Pipeline.model_validate({
+ 'name': 'ptest one thousand'
+ })
+ response = fastapi_testclient.put(
+ '/tasks',
+ json=modified_task.model_dump(),
+ follow_redirects=True,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+
+
+def test_task_claim(async_minimum, async_tasks, fastapi_testclient):
+
+ response = fastapi_testclient.get(
+ '/pipelines/ptest some', headers=headers4ptest_one)
+ assert response.status_code == status.HTTP_200_OK
+
+ pipeline = response.json()
+ tasks_seen = []
+
+ # Cannot claim with a token issued for a different pipeline.
+ response = fastapi_testclient.post(
+ '/tasks/claim',
+ json=pipeline,
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
+
+ response = fastapi_testclient.post(
+ '/tasks/claim',
+ json=pipeline,
+ headers=headers4ptest_some
+ )
+ assert response.status_code == status.HTTP_200_OK
+ tasks = response.json()
+ assert len(tasks) == 1, 'Defaults to one task claimed'
+ t = tasks[0]
+ assert t['task_input'] == {'input': 1}
+ assert t['status'] == TaskStateEnum.CLAIMED
+ tasks_seen.append(t['task_input_id'])
+
+ response = fastapi_testclient.post(
+ '/tasks/claim?num_tasks=0',
+ json=pipeline,
+ headers=headers4ptest_some
+ )
+ assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY, 'Not allowed to use invalid numbers of tasks' # noqa: E501
+
+ response = fastapi_testclient.post(
+ '/tasks/claim?num_tasks=2',
+ json=pipeline,
+ headers=headers4ptest_some
+ )
+ assert response.status_code == status.HTTP_200_OK
+ tasks = response.json()
+ assert len(tasks) == 2, 'Asked for two, got two'
+ tasks_seen.extend([t['task_input_id'] for t in tasks])
+
+ # Cannot test race conditions, because sqlite only pretends to support full async
+ # Claim the rest
+ response = fastapi_testclient.post(
+ '/tasks/claim?num_tasks=8',
+ json=pipeline,
+ headers=headers4ptest_some
+ )
+ assert response.status_code == status.HTTP_200_OK
+ tasks = response.json()
+ assert len(tasks) == 7, 'Asked for eight, got seven'
+ tasks_seen.extend([t['task_input_id'] for t in tasks])
+ assert len(set(tasks_seen)) == 10, 'Ten unique tasks were claimed'
+
+ response = fastapi_testclient.post(
+ '/tasks/claim',
+ json=pipeline,
+ headers=headers4ptest_some
+ )
+ assert response.status_code == status.HTTP_200_OK
+ tasks = response.json()
+ assert len(tasks) == 0, 'Tried to claim, did not get any tasks'
+
+def test_get_tasks(async_minimum, async_tasks, fastapi_testclient):
+ response = fastapi_testclient.get('/tasks')
+ assert response.status_code == status.HTTP_403_FORBIDDEN, 'Need a token to see any tasks'
+
+ response = fastapi_testclient.get('/tasks', headers=headers4ptest_one)
+ assert response.status_code == status.HTTP_200_OK, 'Authorised task fetching'
+ tasks = response.json()
+
+ unique_pipelines = {t['pipeline']['name'] for t in tasks}
+
+ assert 'ptest one' in unique_pipelines, 'Tasks for pipeline present with relevant token'
+ assert 'ptest some' in unique_pipelines, 'Tasks for other pipelines also present with token'
+
+ response = fastapi_testclient.get(
+ '/tasks?pipeline_name=ptest one',
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_200_OK, 'One optional argument works'
+ tasks = response.json()
+ assert len(tasks) == 2, 'Most tasks now filtered'
+ assert {t['pipeline']['name'] for t in tasks} == {'ptest one'}, 'All tasks belong to pipeline'
+
+ response = fastapi_testclient.get(
+ '/tasks?status=PENDING',
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_200_OK, 'Other optional argument works'
+ tasks = response.json()
+ assert len(tasks) == 10, 'Ten pending tasks selected'
+
+ response = fastapi_testclient.get(
+ '/tasks?pipeline_name="ptest one"&status=PENDING',
+ headers=headers4ptest_one
+ )
+ assert response.status_code == status.HTTP_200_OK, 'Both arguments together work'
+ print(response.text)
+ tasks = response.json()
+ assert len(tasks) == 0, 'but no tasks are returned that match status and pipeline'