From 4397d2ec34fe3712bb62792937bc95c00556b5fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Fri, 8 Nov 2024 10:34:30 +0300 Subject: [PATCH] [DOP-20061] add tests with None stats --- .../test_lineage/test_job_lineage.py | 74 ----------- .../test_lineage/test_operation_lineage.py | 121 ++++++++++++++++++ 2 files changed, 121 insertions(+), 74 deletions(-) diff --git a/tests/test_server/test_lineage/test_job_lineage.py b/tests/test_server/test_lineage/test_job_lineage.py index 3c06281b..f71670d3 100644 --- a/tests/test_server/test_lineage/test_job_lineage.py +++ b/tests/test_server/test_lineage/test_job_lineage.py @@ -1115,77 +1115,3 @@ async def test_get_job_lineage_with_symlinks( for dataset in sorted(datasets, key=lambda x: x.id) ], } - - -async def test_get_job_lineage_with_empty_relation_stats( - test_client: AsyncClient, - async_session: AsyncSession, - lineage_with_empty_relation_stats: LINEAGE_FIXTURE_ANNOTATION, -): - all_jobs, all_runs, _, all_datasets, _, all_outputs = lineage_with_empty_relation_stats - - job = all_jobs[0] - outputs = [output for output in all_outputs if output.job_id == job.id] - dataset_ids = {output.dataset_id for output in outputs} - datasets = [dataset for dataset in all_datasets if dataset.id in dataset_ids] - - [job] = await enrich_jobs([job], async_session) - datasets = await enrich_datasets(datasets, async_session) - - since = min(run.created_at for run in all_runs if run.job_id == job.id) - response = await test_client.get( - "v1/jobs/lineage", - params={ - "since": since.isoformat(), - "start_node_id": job.id, - "direction": "DOWNSTREAM", - }, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == { - "relations": [ - { - "kind": "OUTPUT", - "from": {"kind": "JOB", "id": job.id}, - "to": {"kind": "DATASET", "id": output.dataset_id}, - "type": "APPEND", - "num_bytes": None, - "num_rows": None, - "num_files": None, - "last_interaction_at": max([output.created_at for output in outputs]).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - } - for output in outputs - ], - "nodes": [ - { - "kind": "JOB", - "id": job.id, - "name": job.name, - "type": job.type, - "location": { - "id": job.location.id, - "name": job.location.name, - "type": job.location.type, - "addresses": [{"url": address.url} for address in job.location.addresses], - "external_id": job.location.external_id, - }, - }, - ] - + [ - { - "kind": "DATASET", - "id": dataset.id, - "format": dataset.format, - "name": dataset.name, - "location": { - "id": dataset.location.id, - "name": dataset.location.name, - "type": dataset.location.type, - "addresses": [{"url": address.url} for address in dataset.location.addresses], - "external_id": dataset.location.external_id, - }, - } - for dataset in sorted(datasets, key=lambda x: x.id) - ], - } diff --git a/tests/test_server/test_lineage/test_operation_lineage.py b/tests/test_server/test_lineage/test_operation_lineage.py index 078fd70b..b0cd5a63 100644 --- a/tests/test_server/test_lineage/test_operation_lineage.py +++ b/tests/test_server/test_lineage/test_operation_lineage.py @@ -990,3 +990,124 @@ async def test_get_operation_lineage_with_symlinks( }, ], } + + +async def test_get_operation_lineage_with_empty_relation_stats( + test_client: AsyncClient, + async_session: AsyncSession, + lineage_with_empty_relation_stats: LINEAGE_FIXTURE_ANNOTATION, +): + all_jobs, all_runs, all_operations, all_datasets, _, all_outputs = lineage_with_empty_relation_stats + + operation = all_operations[0] + outputs = [output for output in all_outputs if output.operation_id == operation.id] + dataset_ids = {output.dataset_id for output in outputs} + datasets = [dataset for dataset in all_datasets if dataset.id in dataset_ids] + run = next(run for run in all_runs if run.id == operation.run_id) + job = next(job for job in all_jobs if job.id == run.job_id) + datasets = await enrich_datasets(datasets, async_session) + [job] = await enrich_jobs([job], async_session) + [run] = await enrich_runs([run], async_session) + + since = run.created_at + response = await test_client.get( + "v1/operations/lineage", + params={ + "since": since.isoformat(), + "start_node_id": str(operation.id), + "direction": "DOWNSTREAM", + }, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": [ + { + "kind": "PARENT", + "from": {"kind": "JOB", "id": run.job_id}, + "to": {"kind": "RUN", "id": str(run.id)}, + }, + { + "kind": "PARENT", + "from": {"kind": "RUN", "id": str(operation.run_id)}, + "to": {"kind": "OPERATION", "id": str(operation.id)}, + }, + ] + + [ + { + "kind": "OUTPUT", + "from": {"kind": "OPERATION", "id": str(operation.id)}, + "to": {"kind": "DATASET", "id": output.dataset_id}, + "type": "APPEND", + "num_bytes": None, + "num_rows": None, + "num_files": None, + "last_interaction_at": max([output.created_at for output in outputs]).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + } + for output in outputs + ], + "nodes": [ + { + "kind": "JOB", + "id": job.id, + "name": job.name, + "type": job.type, + "location": { + "id": job.location.id, + "name": job.location.name, + "type": job.location.type, + "addresses": [{"url": address.url} for address in job.location.addresses], + "external_id": job.location.external_id, + }, + }, + ] + + [ + { + "kind": "DATASET", + "id": dataset.id, + "format": dataset.format, + "name": dataset.name, + "location": { + "id": dataset.location.id, + "name": dataset.location.name, + "type": dataset.location.type, + "addresses": [{"url": address.url} for address in dataset.location.addresses], + "external_id": dataset.location.external_id, + }, + } + for dataset in sorted(datasets, key=lambda x: x.id) + ] + + [ + { + "kind": "RUN", + "id": str(run.id), + "job_id": run.job_id, + "created_at": run.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "parent_run_id": str(run.parent_run_id), + "status": run.status.name, + "external_id": run.external_id, + "attempt": run.attempt, + "persistent_log_url": run.persistent_log_url, + "running_log_url": run.running_log_url, + "started_at": run.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "started_by_user": {"name": run.started_by_user.name}, + "start_reason": run.start_reason.value, + "ended_at": run.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_reason": run.end_reason, + }, + { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + ], + }