Skip to content

Commit

Permalink
[DOP-20061] add tests with None stats
Browse files Browse the repository at this point in the history
  • Loading branch information
TiGrib committed Nov 8, 2024
1 parent 587578e commit 4397d2e
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 74 deletions.
74 changes: 0 additions & 74 deletions tests/test_server/test_lineage/test_job_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,77 +1115,3 @@ async def test_get_job_lineage_with_symlinks(
for dataset in sorted(datasets, key=lambda x: x.id)
],
}


async def test_get_job_lineage_with_empty_relation_stats(
test_client: AsyncClient,
async_session: AsyncSession,
lineage_with_empty_relation_stats: LINEAGE_FIXTURE_ANNOTATION,
):
all_jobs, all_runs, _, all_datasets, _, all_outputs = lineage_with_empty_relation_stats

job = all_jobs[0]
outputs = [output for output in all_outputs if output.job_id == job.id]
dataset_ids = {output.dataset_id for output in outputs}
datasets = [dataset for dataset in all_datasets if dataset.id in dataset_ids]

[job] = await enrich_jobs([job], async_session)
datasets = await enrich_datasets(datasets, async_session)

since = min(run.created_at for run in all_runs if run.job_id == job.id)
response = await test_client.get(
"v1/jobs/lineage",
params={
"since": since.isoformat(),
"start_node_id": job.id,
"direction": "DOWNSTREAM",
},
)

assert response.status_code == HTTPStatus.OK, response.json()
assert response.json() == {
"relations": [
{
"kind": "OUTPUT",
"from": {"kind": "JOB", "id": job.id},
"to": {"kind": "DATASET", "id": output.dataset_id},
"type": "APPEND",
"num_bytes": None,
"num_rows": None,
"num_files": None,
"last_interaction_at": max([output.created_at for output in outputs]).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
}
for output in outputs
],
"nodes": [
{
"kind": "JOB",
"id": job.id,
"name": job.name,
"type": job.type,
"location": {
"id": job.location.id,
"name": job.location.name,
"type": job.location.type,
"addresses": [{"url": address.url} for address in job.location.addresses],
"external_id": job.location.external_id,
},
},
]
+ [
{
"kind": "DATASET",
"id": dataset.id,
"format": dataset.format,
"name": dataset.name,
"location": {
"id": dataset.location.id,
"name": dataset.location.name,
"type": dataset.location.type,
"addresses": [{"url": address.url} for address in dataset.location.addresses],
"external_id": dataset.location.external_id,
},
}
for dataset in sorted(datasets, key=lambda x: x.id)
],
}
121 changes: 121 additions & 0 deletions tests/test_server/test_lineage/test_operation_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,3 +990,124 @@ async def test_get_operation_lineage_with_symlinks(
},
],
}


async def test_get_operation_lineage_with_empty_relation_stats(
test_client: AsyncClient,
async_session: AsyncSession,
lineage_with_empty_relation_stats: LINEAGE_FIXTURE_ANNOTATION,
):
all_jobs, all_runs, all_operations, all_datasets, _, all_outputs = lineage_with_empty_relation_stats

operation = all_operations[0]
outputs = [output for output in all_outputs if output.operation_id == operation.id]
dataset_ids = {output.dataset_id for output in outputs}
datasets = [dataset for dataset in all_datasets if dataset.id in dataset_ids]
run = next(run for run in all_runs if run.id == operation.run_id)
job = next(job for job in all_jobs if job.id == run.job_id)
datasets = await enrich_datasets(datasets, async_session)
[job] = await enrich_jobs([job], async_session)
[run] = await enrich_runs([run], async_session)

since = run.created_at
response = await test_client.get(
"v1/operations/lineage",
params={
"since": since.isoformat(),
"start_node_id": str(operation.id),
"direction": "DOWNSTREAM",
},
)

assert response.status_code == HTTPStatus.OK, response.json()
assert response.json() == {
"relations": [
{
"kind": "PARENT",
"from": {"kind": "JOB", "id": run.job_id},
"to": {"kind": "RUN", "id": str(run.id)},
},
{
"kind": "PARENT",
"from": {"kind": "RUN", "id": str(operation.run_id)},
"to": {"kind": "OPERATION", "id": str(operation.id)},
},
]
+ [
{
"kind": "OUTPUT",
"from": {"kind": "OPERATION", "id": str(operation.id)},
"to": {"kind": "DATASET", "id": output.dataset_id},
"type": "APPEND",
"num_bytes": None,
"num_rows": None,
"num_files": None,
"last_interaction_at": max([output.created_at for output in outputs]).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
}
for output in outputs
],
"nodes": [
{
"kind": "JOB",
"id": job.id,
"name": job.name,
"type": job.type,
"location": {
"id": job.location.id,
"name": job.location.name,
"type": job.location.type,
"addresses": [{"url": address.url} for address in job.location.addresses],
"external_id": job.location.external_id,
},
},
]
+ [
{
"kind": "DATASET",
"id": dataset.id,
"format": dataset.format,
"name": dataset.name,
"location": {
"id": dataset.location.id,
"name": dataset.location.name,
"type": dataset.location.type,
"addresses": [{"url": address.url} for address in dataset.location.addresses],
"external_id": dataset.location.external_id,
},
}
for dataset in sorted(datasets, key=lambda x: x.id)
]
+ [
{
"kind": "RUN",
"id": str(run.id),
"job_id": run.job_id,
"created_at": run.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"parent_run_id": str(run.parent_run_id),
"status": run.status.name,
"external_id": run.external_id,
"attempt": run.attempt,
"persistent_log_url": run.persistent_log_url,
"running_log_url": run.running_log_url,
"started_at": run.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
"started_by_user": {"name": run.started_by_user.name},
"start_reason": run.start_reason.value,
"ended_at": run.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_reason": run.end_reason,
},
{
"kind": "OPERATION",
"id": str(operation.id),
"created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"run_id": str(operation.run_id),
"name": operation.name,
"status": operation.status.name,
"type": operation.type.value,
"position": operation.position,
"group": operation.group,
"description": operation.description,
"started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
},
],
}

0 comments on commit 4397d2e

Please sign in to comment.