From e7e508082c2b5c44fa3cc66059529e9e24f70455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 15 Nov 2024 18:38:52 +0300 Subject: [PATCH] [DOP-21295] Simplify tests a bit --- .../test_server/fixtures/factories/lineage.py | 88 +++++++----- .../test_lineage/test_dataset_lineage.py | 127 +++++++++--------- .../test_lineage/test_job_lineage.py | 93 +++++-------- .../test_lineage/test_operation_lineage.py | 33 ++--- .../test_lineage/test_run_lineage.py | 55 ++++---- 5 files changed, 192 insertions(+), 204 deletions(-) diff --git a/tests/test_server/fixtures/factories/lineage.py b/tests/test_server/fixtures/factories/lineage.py index 34ca309..281be38 100644 --- a/tests/test_server/fixtures/factories/lineage.py +++ b/tests/test_server/fixtures/factories/lineage.py @@ -24,12 +24,17 @@ async def simple_lineage( job: Job, user: User, ) -> AsyncGenerator[LineageResult, None]: - # This fixture generates a simple lineage graph with 1 job, 2 runs, 2 operations for each run, and 4 datasets. - # The structure is as follows: J --> R; R --> O1; R --> O2; D0 --> O1 --> D1; D2 --> O2 --> D3. + # Two independent operations, run twice: + # J1 -> R1 -> O1, D1 -> O1 -> D2 + # J1 -> R1 -> O2, D3 -> O2 -> D4 + # J1 -> R2 -> O3, D1 -> O3 -> D2 + # J1 -> R2 -> O4, D3 -> O4 -> D4 - lineage = LineageResult() - lineage.jobs.append(job) num_runs = 2 + num_operations = 2 + num_datasets = 4 + + lineage = LineageResult(jobs=[job]) async with async_session_maker() as async_session: created_at = datetime.now() for n in range(num_runs): @@ -52,11 +57,11 @@ async def simple_lineage( "created_at": run.created_at + timedelta(seconds=0.2 * i), }, ) - for i in range(2) + for i in range(num_operations) ] lineage.operations.extend(operations) - dataset_locations = [await create_location(async_session) for _ in range(4)] + dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] lineage.datasets.extend(datasets) @@ -168,10 +173,10 @@ async def three_days_lineage( lineage.outputs.extend(outputs) async_session.expunge_all() - yield lineage + yield lineage - async with async_session_maker() as async_session: - await clean_db(async_session) + async with async_session_maker() as async_session: + await clean_db(async_session) @pytest_asyncio.fixture() @@ -179,18 +184,23 @@ async def lineage_with_depth( async_session_maker: Callable[[], AsyncContextManager[AsyncSession]], user: User, ): - # This fixture generates a lineage with three simultaneous trees, structures as: J --> R --> O, - # connected through Input and Output datasets. - create_at = datetime.now() + # Three trees of J -> R -> O, connected via datasets: + # J1 -> R1 -> O1, D1 -> O1 -> D2 + # J2 -> R2 -> O2, D2 -> O2 -> D3 + # J3 -> R3 -> O3, D3 -> O3 -> D4 + + num_datasets = 4 + num_jobs = 3 + created_at = datetime.now() + lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(4)] + dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] lineage.datasets.extend(datasets) # Create a job, run and operation with IO datasets. - for i in range(3): - + for i in range(num_jobs): job_location = await create_location(async_session) job = await create_job(async_session, location_id=job_location.id) lineage.jobs.append(job) @@ -200,7 +210,7 @@ async def lineage_with_depth( run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, - "created_at": create_at + timedelta(seconds=i), + "created_at": created_at + timedelta(seconds=i), }, ) lineage.runs.append(run) @@ -239,10 +249,10 @@ async def lineage_with_depth( ) lineage.outputs.append(output) - yield lineage + yield lineage - async with async_session_maker() as async_session: - await clean_db(async_session) + async with async_session_maker() as async_session: + await clean_db(async_session) @pytest_asyncio.fixture() @@ -250,16 +260,19 @@ async def lineage_with_depth_and_cycle( async_session_maker: Callable[[], AsyncContextManager[AsyncSession]], user: User, ): - # This fixture generates a lineage with two trees: J --> R --> O, - # connected through a single Dataset, forming a cycle. + # Two trees of J -> R -> O, connected via one dataset, forming cycles: + # J1 -> R1 -> O1, D -> O1 -> D + # J2 -> R2 -> O2, D -> O2 -> D + created_at = datetime.now() + num_jobs = 2 lineage = LineageResult() async with async_session_maker() as async_session: dataset_location = await create_location(async_session) dataset = await create_dataset(async_session, location_id=dataset_location.id) lineage.datasets.append(dataset) - jobs_location = [await create_location(async_session) for _ in range(2)] + jobs_location = [await create_location(async_session) for _ in range(num_jobs)] jobs = [await create_job(async_session, location_id=location.id) for location in jobs_location] lineage.jobs.extend(jobs) @@ -321,10 +334,11 @@ async def lineage_with_depth_and_cycle( lineage.runs.sort(key=lambda x: x.id) lineage.jobs.sort(key=lambda x: x.id) - yield lineage - async with async_session_maker() as async_session: - await clean_db(async_session) + yield lineage + + async with async_session_maker() as async_session: + await clean_db(async_session) @pytest_asyncio.fixture() @@ -332,18 +346,26 @@ async def lineage_with_symlinks( async_session_maker: Callable[[], AsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # This fixture generates three simple lineage trees, structured as: J --> R --> O, D0 --> O --> D1S. - # Each dataset has a symlink dataset, and the trees are connected through these symlink datasets. + # Three trees of J -> R -> O, connected to datasets via symlinks: + # J1 -> R1 -> O1, D1 -> O1 -> D2S + # J2 -> R2 -> O2, D2 -> O2 -> D3S + # J3 -> R3 -> O3, D3 -> O2 -> D4S lineage = LineageResult() created_at = datetime.now() + num_datasets = 4 + num_jobs = 3 async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(4)] + dataset_locations = [ + await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(num_datasets) + ] datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] lineage.datasets.extend(datasets) - symlink_locations = [await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(4)] + symlink_locations = [ + await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(num_datasets) + ] symlink_datasets = [ await create_dataset(async_session, location_id=location.id) for location in symlink_locations ] @@ -358,7 +380,7 @@ async def lineage_with_symlinks( lineage.dataset_symlinks.extend(warehouse) # Make graphs - for i in range(3): + for i in range(num_jobs): job_location = await create_location(async_session) job = await create_job(async_session, location_id=job_location.id) lineage.jobs.append(job) @@ -420,6 +442,12 @@ async def lineage_with_empty_relation_stats( ) -> AsyncGenerator[LineageResult, None]: # This fixture generates a simple lineage tree structured as J --> R --> O and D0 --> 0 --> D1, # with empty statistics in both Input and Output. + + # Three trees of J -> R -> O, connected to datasets via symlinks: + # J1 -> R1 -> O1, D1 -> O1 -> D2S + # J2 -> R2 -> O2, D2 -> O2 -> D3S + # J3 -> R3 -> O3, D3 -> O2 -> D4S + created_at = datetime.now() lineage = LineageResult() lineage.jobs.append(job) diff --git a/tests/test_server/test_lineage/test_dataset_lineage.py b/tests/test_server/test_lineage/test_dataset_lineage.py index 740a4ec..6f7eeaa 100644 --- a/tests/test_server/test_lineage/test_dataset_lineage.py +++ b/tests/test_server/test_lineage/test_dataset_lineage.py @@ -688,44 +688,38 @@ async def test_get_dataset_lineage_with_depth_and_granularity_run( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1, D1 -*> R1 -*> D2 + # J2 -*> R2, D2 -*> R2 --> D3 + # J3 --> R3, D3 --> R3 --> D4 # Go dataset -> runs[first level] first_level_dataset = lineage.datasets[0] first_level_inputs = [input for input in lineage.inputs if input.dataset_id == first_level_dataset.id] - first_level_run_ids = {output.run_id for output in first_level_inputs} - first_level_runs = [run for run in lineage.runs if run.id in first_level_run_ids] - assert first_level_runs + first_level_run_ids = {input.run_id for input in first_level_inputs} + assert first_level_run_ids # Go runs[first level] -> datasets[second level] second_level_outputs = [output for output in lineage.outputs if output.run_id in first_level_run_ids] - second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} - second_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in second_level_dataset_ids] - assert second_level_datasets - - second_level_run_ids = {output.run_id for output in second_level_outputs} - second_level_runs = [run for run in lineage.runs if run.id in second_level_run_ids] - assert second_level_runs + second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} - {first_level_dataset.id} + assert second_level_dataset_ids # Go datasets[second level] -> runs[third level] # There are more levels in this graph, but we stop here - third_level_dataset_ids = second_level_dataset_ids - {first_level_dataset.id} - third_level_inputs = [input for input in lineage.inputs if input.dataset_id in third_level_dataset_ids] - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets - - third_level_run_ids = {input.run_id for input in third_level_inputs} - second_level_run_ids - first_level_run_ids - third_level_runs = [run for run in lineage.runs if run.id in third_level_run_ids] - assert third_level_runs + third_level_inputs = [input for input in lineage.inputs if input.dataset_id in second_level_dataset_ids] + assert third_level_inputs + third_level_run_ids = {output.run_id for output in third_level_inputs} - first_level_run_ids + assert third_level_run_ids inputs = first_level_inputs + third_level_inputs input_stats = relation_stats_by_runs(inputs) outputs = second_level_outputs output_stats = relation_stats_by_runs(outputs) - dataset_ids = {first_level_dataset.id} | second_level_dataset_ids | third_level_dataset_ids + dataset_ids = {first_level_dataset.id} | second_level_dataset_ids datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] - run_ids = first_level_run_ids | second_level_run_ids | third_level_run_ids + run_ids = first_level_run_ids | third_level_run_ids runs = [run for run in lineage.runs if run.id in run_ids] job_ids = {run.job_id for run in runs} @@ -846,35 +840,40 @@ async def test_get_dataset_lineage_with_depth_and_granularity_job( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # D1 -*> J1 -*> D2 + # D2 -*> J2 --> D3 + # D3 --> J3 --> D4 - some_input = lineage.inputs[0] - first_level_inputs = [input for input in lineage.inputs if input.dataset_id == some_input.dataset_id] - first_level_dataset_ids = {input.dataset_id for input in first_level_inputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] + # Go dataset[first level] -> jobs[first level] + first_level_dataset = lineage.datasets[0] + first_level_inputs = [input for input in lineage.inputs if input.dataset_id == first_level_dataset.id] assert first_level_inputs - # Go inputs[first level] -> jobs[first level] first_level_job_ids = {input.job_id for input in first_level_inputs} - first_level_jobs = [job for job in lineage.jobs if job.id in first_level_job_ids] + assert first_level_job_ids - # Go job[first level] -> outputs[second level] + # Go job[first level] -> datasets[second level] second_level_outputs = [output for output in lineage.outputs if output.job_id in first_level_job_ids] - second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} - second_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in second_level_dataset_ids] assert second_level_outputs + second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} + assert second_level_dataset_ids - # Go outputs[second level] -> jobs[third level] + # Go datasets[second level] -> jobs[third level] third_level_inputs = [input for input in lineage.inputs if input.dataset_id in second_level_dataset_ids] + assert third_level_inputs third_level_job_ids = {input.job_id for input in third_level_inputs} - first_level_job_ids - third_level_jobs = [job for job in lineage.jobs if job.id in third_level_job_ids] - assert third_level_jobs + assert third_level_job_ids outputs = second_level_outputs output_stats = relation_stats_by_jobs(outputs) inputs = first_level_inputs + third_level_inputs input_stats = relation_stats_by_jobs(inputs) - datasets = first_level_datasets + second_level_datasets - jobs = first_level_jobs + third_level_jobs + dataset_ids = {first_level_dataset.id} | second_level_dataset_ids + datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] + + job_ids = first_level_job_ids | third_level_job_ids + jobs = [job for job in lineage.jobs if job.id in job_ids] jobs = await enrich_jobs(jobs, async_session) datasets = await enrich_datasets(datasets, async_session) @@ -884,7 +883,7 @@ async def test_get_dataset_lineage_with_depth_and_granularity_job( "v1/datasets/lineage", params={ "since": since.isoformat(), - "start_node_id": some_input.dataset_id, + "start_node_id": first_level_dataset.id, "direction": "DOWNSTREAM", "depth": 3, "granularity": "JOB", @@ -963,50 +962,46 @@ async def test_get_dataset_lineage_with_depth_and_granularity_operation( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1 -*> O1, D1 *-> O1 -*> D2 + # J2 -*> R2 -*> O2, D2 -*> O2 --> D3 + # J3 --> R3 --> O3, D3 --> O3 --> D4 - some_input = lineage.inputs[0] - first_level_inputs = [input for input in lineage.inputs if input.dataset_id == some_input.dataset_id] - first_level_dataset_ids = {input.dataset_id for input in first_level_inputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] - - # Go inputs[first level] -> operations[first level] + runs[first level] + jobs[first level] + # Go datasets[first level] -> operations[first level] + runs[first level] + jobs[first level] + first_level_dataset = lineage.datasets[0] + first_level_inputs = [input for input in lineage.inputs if input.dataset_id == first_level_dataset.id] + assert first_level_inputs first_level_operation_ids = {input.operation_id for input in first_level_inputs} - first_level_operations = [ - operation for operation in lineage.operations if operation.id in first_level_operation_ids - ] - first_level_run_ids = {operation.run_id for operation in first_level_operations} - first_level_runs = [run for run in lineage.runs if run.id in first_level_run_ids] - first_level_job_ids = {run.job_id for run in first_level_runs} - first_level_jobs = [job for job in lineage.jobs if job.id in first_level_job_ids] - assert first_level_runs + assert first_level_operation_ids - # Go operations[first level] -> outputs[second level] + # Go operations[first level] -> datasets[second level] second_level_outputs = [output for output in lineage.outputs if output.operation_id in first_level_operation_ids] - second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} - second_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in second_level_dataset_ids] assert second_level_outputs + second_level_dataset_ids = {output.dataset_id for output in second_level_outputs} + assert second_level_dataset_ids - # Go outputs[second level] -> operations[third level] + runs[third level] + jobs[third level] + # Go datasets[second level] -> operations[third level] + runs[third level] + jobs[third level] third_level_inputs = [input for input in lineage.inputs if input.dataset_id in second_level_dataset_ids] + assert third_level_inputs third_level_operation_ids = {input.operation_id for input in third_level_inputs} - first_level_operation_ids - third_level_operations = [ - operation for operation in lineage.operations if operation.id in third_level_operation_ids - ] - third_level_run_ids = {operation.run_id for operation in third_level_operations} - first_level_run_ids - third_level_runs = [run for run in lineage.runs if run.id in third_level_run_ids] - third_level_job_ids = {run.job_id for run in third_level_runs} - first_level_job_ids - third_level_jobs = [job for job in lineage.jobs if job.id in third_level_job_ids] - assert third_level_runs + assert third_level_operation_ids outputs = second_level_outputs output_stats = relation_stats_by_operations(outputs) inputs = first_level_inputs + third_level_inputs input_stats = relation_stats_by_operations(inputs) - datasets = first_level_datasets + second_level_datasets - operations = first_level_operations + third_level_operations - runs = first_level_runs + third_level_runs - jobs = first_level_jobs + third_level_jobs + dataset_ids = {first_level_dataset.id} | second_level_dataset_ids + datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] + + operation_ids = first_level_operation_ids | third_level_operation_ids + operations = [operation for operation in lineage.operations if operation.id in operation_ids] + + run_ids = {input.run_id for input in inputs} + runs = [run for run in lineage.runs if run.id in run_ids] + + job_ids = {input.job_id for input in inputs} + jobs = [job for job in lineage.jobs if job.id in job_ids] datasets = await enrich_datasets(datasets, async_session) runs = await enrich_runs(runs, async_session) @@ -1017,7 +1012,7 @@ async def test_get_dataset_lineage_with_depth_and_granularity_operation( "v1/datasets/lineage", params={ "since": since.isoformat(), - "start_node_id": some_input.dataset_id, + "start_node_id": first_level_dataset.id, "direction": "DOWNSTREAM", "depth": 3, "granularity": "OPERATION", diff --git a/tests/test_server/test_lineage/test_job_lineage.py b/tests/test_server/test_lineage/test_job_lineage.py index f0520f9..db12060 100644 --- a/tests/test_server/test_lineage/test_job_lineage.py +++ b/tests/test_server/test_lineage/test_job_lineage.py @@ -5,15 +5,7 @@ from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession -from data_rentgen.db.models import ( - Dataset, - DatasetSymlink, - Input, - Job, - Operation, - Output, - Run, -) +from data_rentgen.db.models import Job, Run from tests.test_server.utils.enrich import enrich_datasets, enrich_jobs, enrich_runs from tests.test_server.utils.lineage_result import LineageResult from tests.test_server.utils.stats import relation_stats, relation_stats_by_jobs @@ -132,7 +124,6 @@ async def test_get_job_lineage_no_inputs_outputs( async_session: AsyncSession, job: Job, run: Run, - operation: Operation, direction: str, ): response = await test_client.get( @@ -149,7 +140,7 @@ async def test_get_job_lineage_no_inputs_outputs( assert response.status_code == HTTPStatus.OK, response.json() assert response.json() == { - # runs & operations without inputs/outputs are excluded, + # runs without inputs/outputs are excluded, # but job is left intact "relations": [], "nodes": [ @@ -529,27 +520,12 @@ async def test_get_job_lineage_with_direction_and_until_and_run_granularity( run_ids = {run.id for run in raw_runs} assert raw_runs - raw_operations = [ - operation - for operation in lineage.operations - if operation.run_id in run_ids and since <= operation.created_at <= until - ] - operation_ids = {operation.id for operation in raw_operations} - assert raw_operations - - inputs = [ - input for input in lineage.inputs if input.operation_id in operation_ids and since <= input.created_at <= until - ] + inputs = [input for input in lineage.inputs if input.run_id in run_ids and since <= input.created_at <= until] assert inputs input_stats = relation_stats(inputs) - # Only operations with some inputs are returned - operation_ids = {input.operation_id for input in inputs} - operations = [operation for operation in lineage.operations if operation.id in operation_ids] - assert operations - - # Same for runs - run_ids = {operation.run_id for operation in operations} + # Only runs with some inputs are returned + run_ids = {input.run_id for input in inputs} runs = [run for run in lineage.runs if run.id in run_ids] assert runs @@ -653,15 +629,17 @@ async def test_get_job_lineage_with_depth( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # D1 --> J1 -*> D2 + # D2 -*> J2 -*> D3 + # D3 --> J3 --> D4 - # Start from - job = lineage.jobs[1] + job = lineage.jobs[0] # Go job[first level] -> datasets[second level] first_level_outputs = [output for output in lineage.outputs if output.job_id == job.id] first_level_dataset_ids = {output.dataset_id for output in first_level_outputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] - assert first_level_datasets + assert first_level_dataset_ids # Go datasets[second level] -> jobs[second level] second_level_inputs = [input for input in lineage.inputs if input.dataset_id in first_level_dataset_ids] @@ -673,8 +651,7 @@ async def test_get_job_lineage_with_depth( # There are more levels in this graph, but we stop here third_level_outputs = [output for output in lineage.outputs if output.job_id in second_level_job_ids] third_level_dataset_ids = {output.dataset_id for output in third_level_outputs} - first_level_dataset_ids - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets + assert third_level_dataset_ids inputs = second_level_inputs input_stats = relation_stats(inputs) @@ -768,38 +745,31 @@ async def test_get_job_lineage_with_depth_and_run_granularity( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1, D1 --> R1 -*> D2 + # J2 -*> R2, D2 -*> R2 -*> D3 + # J3 --> R3, D3 --> R3 --> D4 - job = lineage.jobs[0] + first_level_job_id = lineage.jobs[0].id - # Go output[job] -> operations[first level] -> runs[first level] - first_level_outputs = [output for output in lineage.outputs if output.job_id == job.id] - first_level_operation_ids = {output.operation_id for output in first_level_outputs} - first_level_operations = [ - operation for operation in lineage.operations if operation.id in first_level_operation_ids - ] - assert first_level_operations - first_level_run_ids = {operation.run_id for operation in first_level_operations} + # Go job[first level] -> runs[first level] -> outputs[first level] -> datasets[first level] + first_level_outputs = [output for output in lineage.outputs if output.job_id == first_level_job_id] + first_level_run_ids = {output.run_id for output in first_level_outputs} + assert first_level_run_ids first_level_dataset_ids = {output.dataset_id for output in first_level_outputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] - assert first_level_datasets + assert first_level_dataset_ids - # Go datasets[second level] -> operations[second level] -> runs[second level] -> jobs[second level] + # Go datasets[first level] -> runs[second level] -> jobs[second level] second_level_inputs = [input for input in lineage.inputs if input.dataset_id in first_level_dataset_ids] - second_level_operation_ids = {input.operation_id for input in second_level_inputs} - first_level_operation_ids - second_level_operations = [ - operation for operation in lineage.operations if operation.id in second_level_operation_ids - ] - second_level_run_ids = {operation.run_id for operation in second_level_operations} - first_level_run_ids - second_level_runs = [run for run in lineage.runs if run.id in second_level_run_ids] - second_level_job_ids = {run.job_id for run in second_level_runs} - {job.id} - second_level_jobs = [job for job in lineage.jobs if job.id in second_level_job_ids] - assert second_level_jobs + second_level_run_ids = {input.run_id for input in second_level_inputs} - first_level_run_ids + assert second_level_run_ids + second_level_job_ids = {input.job_id for input in second_level_inputs} - {first_level_job_id} + assert second_level_job_ids # Go runs[second level] -> datasets[third level] third_level_outputs = [output for output in lineage.outputs if output.run_id in second_level_run_ids] third_level_dataset_ids = {output.dataset_id for output in third_level_outputs} - first_level_dataset_ids - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets + assert third_level_dataset_ids inputs = second_level_inputs input_stats = relation_stats(inputs) @@ -809,9 +779,10 @@ async def test_get_job_lineage_with_depth_and_run_granularity( dataset_ids = first_level_dataset_ids | third_level_dataset_ids datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] - jobs = [job] + second_level_jobs - run_ids = first_level_run_ids | second_level_run_ids + job_ids = {first_level_job_id} | second_level_job_ids + jobs = [job for job in lineage.jobs if job.id in job_ids] + run_ids = first_level_run_ids | second_level_run_ids runs = [run for run in lineage.runs if run.id in run_ids] jobs = await enrich_jobs(jobs, async_session) @@ -823,7 +794,7 @@ async def test_get_job_lineage_with_depth_and_run_granularity( "v1/jobs/lineage", params={ "since": since.isoformat(), - "start_node_id": job.id, + "start_node_id": first_level_job_id, "direction": "DOWNSTREAM", "granularity": "RUN", "depth": 3, diff --git a/tests/test_server/test_lineage/test_operation_lineage.py b/tests/test_server/test_lineage/test_operation_lineage.py index 549ca65..851a20b 100644 --- a/tests/test_server/test_lineage/test_operation_lineage.py +++ b/tests/test_server/test_lineage/test_operation_lineage.py @@ -391,11 +391,8 @@ async def test_get_operation_lineage_with_direction_and_until( ): # TODO: This test should be change cause `until` for operation has sense only for `depth` > 1 lineage = simple_lineage - # There is no guarantee that first operation will have any inputs. - # so we need to search for any operation - some_input = lineage.inputs[0] - operation = next(operation for operation in lineage.operations if operation.id == some_input.operation_id) + operation = lineage.operations[0] run = next(run for run in lineage.runs if run.id == operation.run_id) job = next(job for job in lineage.jobs if job.id == run.job_id) @@ -524,34 +521,30 @@ async def test_get_operation_lineage_with_depth( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1 -*> O1, D1 --> O1 -*> D2 + # J2 -*> R2 -*> O2, D2 -*> O2 -*> D3 + # J3 --> R3 --> O3, D3 --> O3 --> D4 - # There is no guarantee that first operation will have any output. - # so we need to search for any operation - some_output = lineage.outputs[0] - some_operation = next(operation for operation in lineage.operations if operation.id == some_output.operation_id) + first_level_operation = lineage.operations[0] # Go operations[first level] -> datasets[second level] - first_level_outputs = [output for output in lineage.outputs if output.operation_id == some_operation.id] + first_level_outputs = [output for output in lineage.outputs if output.operation_id == first_level_operation.id] first_level_dataset_ids = {output.dataset_id for output in first_level_outputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] - assert first_level_datasets + assert first_level_dataset_ids # Go datasets[second level] -> operations[second level] second_level_inputs = [input for input in lineage.inputs if input.dataset_id in first_level_dataset_ids] second_level_operation_ids = {input.operation_id for input in second_level_inputs} - { - some_operation.id, + first_level_operation.id, } - second_level_operations = [ - operation for operation in lineage.operations if operation.id in second_level_operation_ids - ] - assert second_level_operations + assert second_level_operation_ids # Go operations[second level] -> datasets[third level] # There are more levels in this graph, but we stop here third_level_outputs = [output for output in lineage.outputs if output.operation_id in second_level_operation_ids] third_level_dataset_ids = {output.dataset_id for output in third_level_outputs} - first_level_dataset_ids - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets + assert third_level_dataset_ids inputs = second_level_inputs input_stats = relation_stats(inputs) @@ -561,7 +554,7 @@ async def test_get_operation_lineage_with_depth( dataset_ids = first_level_dataset_ids | third_level_dataset_ids datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] - operation_ids = {some_operation.id} | second_level_operation_ids + operation_ids = {first_level_operation.id} | second_level_operation_ids operations = [operation for operation in lineage.operations if operation.id in operation_ids] run_ids = {operation.run_id for operation in operations} @@ -579,7 +572,7 @@ async def test_get_operation_lineage_with_depth( "v1/operations/lineage", params={ "since": since.isoformat(), - "start_node_id": str(some_operation.id), + "start_node_id": str(first_level_operation.id), "direction": "DOWNSTREAM", "depth": 3, }, diff --git a/tests/test_server/test_lineage/test_run_lineage.py b/tests/test_server/test_lineage/test_run_lineage.py index 97e6546..7094fa0 100644 --- a/tests/test_server/test_lineage/test_run_lineage.py +++ b/tests/test_server/test_lineage/test_run_lineage.py @@ -786,30 +786,29 @@ async def test_get_run_lineage_with_depth( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1, D1 --> R1 -*> D2 + # J2 -*> R2, D2 -*> R2 -*> D3 + # J3 --> R3, D3 --> R3 --> D4 - # There is no guarantee that first run will have any outputs - # so we need to search for any run - output = lineage.outputs[0] - run = next(run for run in lineage.runs if run.id == output.run_id) + first_level_run = lineage.runs[0] - # Go operations[first level] -> datasets[second level] - first_level_outputs = [output for output in lineage.outputs if output.run_id == run.id] + # Go runs[first level] -> datasets[second level] + first_level_outputs = [output for output in lineage.outputs if output.run_id == first_level_run.id] first_level_dataset_ids = {output.dataset_id for output in first_level_outputs} first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] assert first_level_datasets - # Go datasets[second level] -> operations[second level] -> runs[second level] + # Go datasets[second level] -> runs[second level] second_level_inputs = [input for input in lineage.inputs if input.dataset_id in first_level_dataset_ids] - second_level_run_ids = {input.run_id for input in second_level_inputs} - {run.id} - second_level_runs = [run for run in lineage.runs if run.id in second_level_run_ids] - assert second_level_runs + second_level_run_ids = {input.run_id for input in second_level_inputs} - {first_level_run.id} + assert second_level_run_ids # Go runs[second level] -> datasets[third level] # There are more levels in this graph, but we stop here third_level_outputs = [output for output in lineage.outputs if output.run_id in second_level_run_ids] third_level_dataset_ids = {output.dataset_id for output in third_level_outputs} - first_level_dataset_ids - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets + assert third_level_dataset_ids inputs = second_level_inputs input_stats = relation_stats(inputs) @@ -819,7 +818,8 @@ async def test_get_run_lineage_with_depth( dataset_ids = first_level_dataset_ids | third_level_dataset_ids datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids] - runs = [run] + second_level_runs + run_ids = {first_level_run.id} | second_level_run_ids + runs = [run for run in lineage.runs if run.id in run_ids] job_ids = {run.job_id for run in runs} jobs = [job for job in lineage.jobs if job.id in job_ids] @@ -831,8 +831,8 @@ async def test_get_run_lineage_with_depth( response = await test_client.get( "v1/runs/lineage", params={ - "since": run.created_at.isoformat(), - "start_node_id": str(run.id), + "since": first_level_run.created_at.isoformat(), + "start_node_id": str(first_level_run.id), "direction": "DOWNSTREAM", "depth": 3, }, @@ -934,23 +934,25 @@ async def test_get_run_lineage_with_depth_and_operation_granularity( lineage_with_depth: LineageResult, ): lineage = lineage_with_depth + # Select only relations marked with * + # J1 -*> R1 -*> O1, D1 --> O1 -*> D2 + # J2 -*> R2 -*> O2, D2 -*> O2 -*> D3 + # J3 --> R3 --> O3, D3 --> O3 --> D4 - # There is no guarantee that first run will have any outputs - # so we need to search for any run - output = lineage.outputs[0] - operation = next(operation for operation in lineage.operations if operation.id == output.operation_id) - run = next(run for run in lineage.runs if run.id == operation.run_id) + first_level_run = lineage.runs[0] # Go operations[first level] -> datasets[second level] - first_level_operations = [operation for operation in lineage.operations if operation.run_id == run.id] + first_level_operations = [operation for operation in lineage.operations if operation.run_id == first_level_run.id] + assert first_level_operations first_level_operation_ids = {operation.id for operation in first_level_operations} first_level_outputs = [output for output in lineage.outputs if output.operation_id in first_level_operation_ids] + assert first_level_outputs first_level_dataset_ids = {output.dataset_id for output in first_level_outputs} - first_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in first_level_dataset_ids] - assert first_level_datasets + assert first_level_dataset_ids # Go datasets[second level] -> operations[second level] second_level_inputs = [input for input in lineage.inputs if input.dataset_id in first_level_dataset_ids] + assert second_level_inputs second_level_operation_ids = {input.operation_id for input in second_level_inputs} - first_level_operation_ids second_level_operations = [ operation for operation in lineage.operations if operation.id in second_level_operation_ids @@ -961,8 +963,7 @@ async def test_get_run_lineage_with_depth_and_operation_granularity( # There are more levels in this graph, but we stop here third_level_outputs = [output for output in lineage.outputs if output.operation_id in second_level_operation_ids] third_level_dataset_ids = {output.dataset_id for output in third_level_outputs} - first_level_dataset_ids - third_level_datasets = [dataset for dataset in lineage.datasets if dataset.id in third_level_dataset_ids] - assert third_level_datasets + assert third_level_dataset_ids inputs = second_level_inputs input_stats = relation_stats(inputs) @@ -988,8 +989,8 @@ async def test_get_run_lineage_with_depth_and_operation_granularity( response = await test_client.get( "v1/runs/lineage", params={ - "since": run.created_at.isoformat(), - "start_node_id": str(run.id), + "since": first_level_run.created_at.isoformat(), + "start_node_id": str(first_level_run.id), "direction": "DOWNSTREAM", "granularity": "OPERATION", "depth": 3,