Skip to content

Commit

Permalink
[DOP-21295] Simplify tests a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Nov 16, 2024
1 parent da6eb4e commit 9753a8c
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 204 deletions.
88 changes: 58 additions & 30 deletions tests/test_server/fixtures/factories/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ async def simple_lineage(
job: Job,
user: User,
) -> AsyncGenerator[LineageResult, None]:
# This fixture generates a simple lineage graph with 1 job, 2 runs, 2 operations for each run, and 4 datasets.
# The structure is as follows: J --> R; R --> O1; R --> O2; D0 --> O1 --> D1; D2 --> O2 --> D3.
# Two independent operations, run twice:
# J1 -> R1 -> O1, D1 -> O1 -> D2
# J1 -> R1 -> O2, D3 -> O2 -> D4
# J1 -> R2 -> O3, D1 -> O3 -> D2
# J1 -> R2 -> O4, D3 -> O4 -> D4

lineage = LineageResult()
lineage.jobs.append(job)
num_runs = 2
num_operations = 2
num_datasets = 4

lineage = LineageResult(jobs=[job])
async with async_session_maker() as async_session:
created_at = datetime.now()
for n in range(num_runs):
Expand All @@ -52,11 +57,11 @@ async def simple_lineage(
"created_at": run.created_at + timedelta(seconds=0.2 * i),
},
)
for i in range(2)
for i in range(num_operations)
]
lineage.operations.extend(operations)

dataset_locations = [await create_location(async_session) for _ in range(4)]
dataset_locations = [await create_location(async_session) for _ in range(num_datasets)]
datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations]
lineage.datasets.extend(datasets)

Expand Down Expand Up @@ -168,29 +173,34 @@ async def three_days_lineage(
lineage.outputs.extend(outputs)
async_session.expunge_all()

yield lineage
yield lineage

async with async_session_maker() as async_session:
await clean_db(async_session)
async with async_session_maker() as async_session:
await clean_db(async_session)


@pytest_asyncio.fixture()
async def lineage_with_depth(
async_session_maker: Callable[[], AsyncContextManager[AsyncSession]],
user: User,
):
# This fixture generates a lineage with three simultaneous trees, structures as: J --> R --> O,
# connected through Input and Output datasets.
create_at = datetime.now()
# Three trees of J -> R -> O, connected via datasets:
# J1 -> R1 -> O1, D1 -> O1 -> D2
# J2 -> R2 -> O2, D2 -> O2 -> D3
# J3 -> R3 -> O3, D3 -> O3 -> D4

num_datasets = 4
num_jobs = 3
created_at = datetime.now()

lineage = LineageResult()
async with async_session_maker() as async_session:
dataset_locations = [await create_location(async_session) for _ in range(4)]
dataset_locations = [await create_location(async_session) for _ in range(num_datasets)]
datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations]
lineage.datasets.extend(datasets)

# Create a job, run and operation with IO datasets.
for i in range(3):

for i in range(num_jobs):
job_location = await create_location(async_session)
job = await create_job(async_session, location_id=job_location.id)
lineage.jobs.append(job)
Expand All @@ -200,7 +210,7 @@ async def lineage_with_depth(
run_kwargs={
"job_id": job.id,
"started_by_user_id": user.id,
"created_at": create_at + timedelta(seconds=i),
"created_at": created_at + timedelta(seconds=i),
},
)
lineage.runs.append(run)
Expand Down Expand Up @@ -239,27 +249,30 @@ async def lineage_with_depth(
)
lineage.outputs.append(output)

yield lineage
yield lineage

async with async_session_maker() as async_session:
await clean_db(async_session)
async with async_session_maker() as async_session:
await clean_db(async_session)


@pytest_asyncio.fixture()
async def lineage_with_depth_and_cycle(
async_session_maker: Callable[[], AsyncContextManager[AsyncSession]],
user: User,
):
# This fixture generates a lineage with two trees: J --> R --> O,
# connected through a single Dataset, forming a cycle.
# Two trees of J -> R -> O, connected via one dataset, forming cycles:
# J1 -> R1 -> O1, D -> O1 -> D
# J2 -> R2 -> O2, D -> O2 -> D

created_at = datetime.now()
num_jobs = 2
lineage = LineageResult()
async with async_session_maker() as async_session:
dataset_location = await create_location(async_session)
dataset = await create_dataset(async_session, location_id=dataset_location.id)
lineage.datasets.append(dataset)

jobs_location = [await create_location(async_session) for _ in range(2)]
jobs_location = [await create_location(async_session) for _ in range(num_jobs)]
jobs = [await create_job(async_session, location_id=location.id) for location in jobs_location]
lineage.jobs.extend(jobs)

Expand Down Expand Up @@ -321,29 +334,38 @@ async def lineage_with_depth_and_cycle(

lineage.runs.sort(key=lambda x: x.id)
lineage.jobs.sort(key=lambda x: x.id)
yield lineage

async with async_session_maker() as async_session:
await clean_db(async_session)
yield lineage

async with async_session_maker() as async_session:
await clean_db(async_session)


@pytest_asyncio.fixture()
async def lineage_with_symlinks(
async_session_maker: Callable[[], AsyncContextManager[AsyncSession]],
user: User,
) -> AsyncGenerator[LineageResult, None]:
# This fixture generates three simple lineage trees, structured as: J --> R --> O, D0 --> O --> D1S.
# Each dataset has a symlink dataset, and the trees are connected through these symlink datasets.
# Three trees of J -> R -> O, connected to datasets via symlinks:
# J1 -> R1 -> O1, D1 -> O1 -> D2S
# J2 -> R2 -> O2, D2 -> O2 -> D3S
# J3 -> R3 -> O3, D3 -> O2 -> D4S

lineage = LineageResult()
created_at = datetime.now()
num_datasets = 4
num_jobs = 3

async with async_session_maker() as async_session:
dataset_locations = [await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(4)]
dataset_locations = [
await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(num_datasets)
]
datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations]
lineage.datasets.extend(datasets)

symlink_locations = [await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(4)]
symlink_locations = [
await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(num_datasets)
]
symlink_datasets = [
await create_dataset(async_session, location_id=location.id) for location in symlink_locations
]
Expand All @@ -358,7 +380,7 @@ async def lineage_with_symlinks(
lineage.dataset_symlinks.extend(warehouse)

# Make graphs
for i in range(3):
for i in range(num_jobs):
job_location = await create_location(async_session)
job = await create_job(async_session, location_id=job_location.id)
lineage.jobs.append(job)
Expand Down Expand Up @@ -420,6 +442,12 @@ async def lineage_with_empty_relation_stats(
) -> AsyncGenerator[LineageResult, None]:
# This fixture generates a simple lineage tree structured as J --> R --> O and D0 --> 0 --> D1,
# with empty statistics in both Input and Output.

# Three trees of J -> R -> O, connected to datasets via symlinks:
# J1 -> R1 -> O1, D1 -> O1 -> D2S
# J2 -> R2 -> O2, D2 -> O2 -> D3S
# J3 -> R3 -> O3, D3 -> O2 -> D4S

created_at = datetime.now()
lineage = LineageResult()
lineage.jobs.append(job)
Expand Down
Loading

0 comments on commit 9753a8c

Please sign in to comment.