Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Remove calls to remote_len_partition #1660

Merged
merged 9 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ class PhysicalPlanScheduler:
def num_partitions(self) -> int: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan: ...
) -> physical_plan.InProgressPhysicalPlan: ...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was just typed wrongly from the beginning, not sure why our typechecks didn't catch it.


class LogicalPlanBuilder:
"""
Expand Down
5 changes: 3 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ def iter_partitions(self) -> Iterator[Union[Table, "RayObjectRef"]]:
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter(self._builder)
yield from partitions_iter
results_iter = context.runner().run_iter(self._builder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also may have to handle the if self._result is not None: case. we can add a test that does

df = df.collect()
for _ in df.iter_partitions():
  pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if self._result is not None:
            # If the dataframe has already finished executing,
            # use the precomputed results.
            yield from self._result.values()

This block is still safe, because PartitionSet.values() still returns partitions (not MaterializedResult)
image

for result in results_iter:
yield result.partition()

@DataframePublicAPI
def __repr__(self) -> str:
Expand Down
34 changes: 3 additions & 31 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib
import sys
from dataclasses import dataclass, field
from typing import Generic, TypeVar
from typing import Generic

if sys.version_info < (3, 8):
from typing_extensions import Protocol
Expand All @@ -26,14 +26,15 @@
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import (
MaterializedResult,
PartialPartitionMetadata,
PartitionMetadata,
PartitionT,
TableParseCSVOptions,
TableReadOptions,
)
from daft.table import Table, table_io

PartitionT = TypeVar("PartitionT")
ID_GEN = itertools.count()


Expand Down Expand Up @@ -251,35 +252,6 @@ def __repr__(self) -> str:
return super().__str__()


class MaterializedResult(Protocol[PartitionT]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Promoted" and moved to partitioning.py to avoid circular deps

"""A protocol for accessing the result partition of a PartitionTask.

Different Runners can fill in their own implementation here.
"""

def partition(self) -> PartitionT:
"""Get the partition of this result."""
...

def vpartition(self) -> Table:
"""Get the vPartition of this result."""
...

def metadata(self) -> PartitionMetadata:
"""Get the metadata of the partition in this result."""
...

def cancel(self) -> None:
"""If possible, cancel execution of this PartitionTask."""
...

def _noop(self, _: PartitionT) -> None:
"""Implement this as a no-op.
https://peps.python.org/pep-0544/#overriding-inferred-variance-of-protocol-classes
"""
...


class Instruction(Protocol):
"""An instruction is a function to run over a list of partitions.

Expand Down
11 changes: 7 additions & 4 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@
)
from daft.expressions import ExpressionsProjection
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
from daft.runners.partitioning import (
MaterializedResult,
PartialPartitionMetadata,
PartitionT,
)

logger = logging.getLogger(__name__)

PartitionT = TypeVar("PartitionT")
T = TypeVar("T")


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
InProgressPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionTaskBuilder[PartitionT]]]

# A PhysicalPlan that is complete and will only yield PartitionTasks or final PartitionTs.
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], MaterializedResult[PartitionT]]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE this important change: instead of yielding PartitionT (either Table or ray.ObjectRef), we now yield the MaterializedResult container instead.



def _stage_id_counter():
Expand Down Expand Up @@ -738,7 +741,7 @@ def materialize(
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
yield done_task.partition()
yield done_task._result
jaychia marked this conversation as resolved.
Show resolved Hide resolved

# Materialize a single dependency.
try:
Expand Down
6 changes: 2 additions & 4 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterator, TypeVar, cast
from typing import Iterator, cast

from daft.daft import (
FileFormat,
Expand All @@ -19,11 +19,9 @@
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
from daft.runners.partitioning import PartialPartitionMetadata, PartitionT
from daft.table import Table

PartitionT = TypeVar("PartitionT")


def scan_with_tasks(
scan_tasks: list[ScanTask],
Expand Down
36 changes: 35 additions & 1 deletion daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,40 @@
PartitionT = TypeVar("PartitionT")


class MaterializedResult(Generic[PartitionT]):
"""A protocol for accessing the result partition of a PartitionTask.

Different Runners can fill in their own implementation here.
"""

@abstractmethod
def partition(self) -> PartitionT:
"""Get the partition of this result."""
...

Check warning on line 104 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L104

Added line #L104 was not covered by tests

@abstractmethod
def vpartition(self) -> Table:
"""Get the vPartition of this result."""
...

Check warning on line 109 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L109

Added line #L109 was not covered by tests

@abstractmethod
def metadata(self) -> PartitionMetadata:
"""Get the metadata of the partition in this result."""
...

Check warning on line 114 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L114

Added line #L114 was not covered by tests

@abstractmethod
def cancel(self) -> None:
"""If possible, cancel execution of this PartitionTask."""
...

Check warning on line 119 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L119

Added line #L119 was not covered by tests

@abstractmethod
def _noop(self, _: PartitionT) -> None:
"""Implement this as a no-op.
https://peps.python.org/pep-0544/#overriding-inferred-variance-of-protocol-classes
"""
...

Check warning on line 126 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L126

Added line #L126 was not covered by tests


class PartitionSet(Generic[PartitionT]):
def _get_merged_vpartition(self) -> Table:
raise NotImplementedError()
Expand Down Expand Up @@ -126,7 +160,7 @@
raise NotImplementedError()

@abstractmethod
def set_partition(self, idx: PartID, part: PartitionT) -> None:
def set_partition(self, idx: PartID, part: MaterializedResult[PartitionT]) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_partition now takes as input a MaterializedResult instead of a plain old PartitionT.

raise NotImplementedError()

@abstractmethod
Expand Down
30 changes: 18 additions & 12 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
StorageConfig,
)
from daft.execution import physical_plan
from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask
from daft.execution.execution_step import Instruction, PartitionTask
from daft.filesystem import glob_path_with_stats
from daft.internal.gpu import cuda_device_count
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
from daft.runners import runner_io
from daft.runners.partitioning import (
MaterializedResult,
PartID,
PartitionCacheEntry,
PartitionMetadata,
Expand Down Expand Up @@ -52,8 +53,8 @@ def _get_merged_vpartition(self) -> Table:
def get_partition(self, idx: PartID) -> Table:
return self._partitions[idx]

def set_partition(self, idx: PartID, part: Table) -> None:
self._partitions[idx] = part
def set_partition(self, idx: PartID, part: MaterializedResult[Table]) -> None:
self._partitions[idx] = part.partition()

def delete_partition(self, idx: PartID) -> None:
del self._partitions[idx]
Expand Down Expand Up @@ -119,11 +120,11 @@ def runner_io(self) -> PyRunnerIO:
return PyRunnerIO()

def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
partitions = list(self.run_iter(builder))
results = list(self.run_iter(builder))

result_pset = LocalPartitionSet({})
for i, partition in enumerate(partitions):
result_pset.set_partition(i, partition)
for i, result in enumerate(results):
result_pset.set_partition(i, result)

pset_entry = self.put_partition_set_into_cache(result_pset)
return pset_entry
Expand All @@ -133,7 +134,7 @@ def run_iter(
builder: LogicalPlanBuilder,
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[Table]:
) -> Iterator[PyMaterializedResult]:
# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
Expand All @@ -147,13 +148,16 @@ def run_iter(
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False)
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
partitions_gen = self._physical_plan_to_partitions(tasks)
yield from partitions_gen
results_gen = self._physical_plan_to_partitions(tasks)
yield from results_gen

def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]:
return self.run_iter(builder, results_buffer_size=results_buffer_size)
for result in self.run_iter(builder, results_buffer_size=results_buffer_size):
yield result.partition()

def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalPlan) -> Iterator[Table]:
def _physical_plan_to_partitions(
self, plan: physical_plan.MaterializedPhysicalPlan[Table]
) -> Iterator[PyMaterializedResult]:
inflight_tasks: dict[str, PartitionTask] = dict()
inflight_tasks_resources: dict[str, ResourceRequest] = dict()
future_to_task: dict[futures.Future, str] = dict()
Expand All @@ -171,7 +175,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
# Blocked on already dispatched tasks; await some tasks.
break

elif isinstance(next_step, Table):
elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
yield next_step
next_step = next(plan)
Expand Down
Loading
Loading