Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Remove calls to remote_len_partition #1660

Merged
merged 9 commits into from
Nov 22, 2023
Merged

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Nov 22, 2023

This PR refactors PartitionSet.len_of_partitions to avoid usage of our remote_len_partition Ray remote function, which has been observed to cause problems when run on dataframes with large amounts of spilling.

A few refactors had to be performed to achieve this:

  1. The RayPartitionSet was refactored to hold RayMaterializedResult objects instead of just raw ray.ObjectRef[Table].
    • This allows us to access the .metadata() method which holds the length of each partition.
    • To access the ray.ObjectRef[Table], we can use the .partition() method which holds the partition
  2. As part of (1), PartitionSet.set_partition had to be refactored to take as input a MaterializedResult instead of a plain PartitionT
  3. On the execution end, we refactored the code mainly around MaterializedPhysicalPlan, which now yields MaterializedResult[PartitionT] instead of just PartitionT, when indicating "done" tasks.

@@ -934,7 +934,7 @@ class PhysicalPlanScheduler:
def num_partitions(self) -> int: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan: ...
) -> physical_plan.InProgressPhysicalPlan: ...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was just typed wrongly from the beginning, not sure why our typechecks didn't catch it.

@@ -251,35 +252,6 @@ def __repr__(self) -> str:
return super().__str__()


class MaterializedResult(Protocol[PartitionT]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Promoted" and moved to partitioning.py to avoid circular deps

T = TypeVar("T")


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
InProgressPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionTaskBuilder[PartitionT]]]

# A PhysicalPlan that is complete and will only yield PartitionTasks or final PartitionTs.
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], MaterializedResult[PartitionT]]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE this important change: instead of yielding PartitionT (either Table or ray.ObjectRef), we now yield the MaterializedResult container instead.

@@ -92,6 +92,36 @@ def from_table(cls, table: Table) -> PartitionMetadata:
PartitionT = TypeVar("PartitionT")


@runtime_checkable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incompatible with Python 3.7, need to find a way to remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do from typing_extensions import runtime_checkable for python 3.7

@@ -126,7 +156,7 @@ def get_partition(self, idx: PartID) -> PartitionT:
raise NotImplementedError()

@abstractmethod
def set_partition(self, idx: PartID, part: PartitionT) -> None:
def set_partition(self, idx: PartID, part: MaterializedResult[PartitionT]) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_partition now takes as input a MaterializedResult instead of a plain old PartitionT.

Copy link

codecov bot commented Nov 22, 2023

Codecov Report

Merging #1660 (386f912) into main (55a95ab) will decrease coverage by 0.14%.
Report is 5 commits behind head on main.
The diff coverage is 89.18%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1660      +/-   ##
==========================================
- Coverage   85.02%   84.88%   -0.14%     
==========================================
  Files          55       55              
  Lines        5314     5318       +4     
==========================================
- Hits         4518     4514       -4     
- Misses        796      804       +8     
Files Coverage Δ
daft/dataframe/dataframe.py 86.07% <100.00%> (-1.14%) ⬇️
daft/execution/execution_step.py 92.70% <100.00%> (+1.04%) ⬆️
daft/execution/physical_plan.py 93.35% <100.00%> (-0.03%) ⬇️
daft/execution/rust_physical_plan_shim.py 98.59% <100.00%> (-0.02%) ⬇️
daft/runners/pyrunner.py 96.95% <100.00%> (-0.02%) ⬇️
daft/runners/runner.py 80.76% <100.00%> (-0.72%) ⬇️
daft/runners/runner_io.py 86.20% <100.00%> (-0.46%) ⬇️
daft/runners/ray_runner.py 89.59% <86.95%> (-0.16%) ⬇️
daft/runners/partitioning.py 80.45% <70.58%> (-1.22%) ⬇️

... and 1 file with indirect coverage changes

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Can we also verify somehow that get_meta is not being call when running len for a materialized result in the dataframe

@@ -198,8 +198,9 @@ def iter_partitions(self) -> Iterator[Union[Table, "RayObjectRef"]]:
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter(self._builder)
yield from partitions_iter
results_iter = context.runner().run_iter(self._builder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also may have to handle the if self._result is not None: case. we can add a test that does

df = df.collect()
for _ in df.iter_partitions():
  pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if self._result is not None:
            # If the dataframe has already finished executing,
            # use the precomputed results.
            yield from self._result.values()

This block is still safe, because PartitionSet.values() still returns partitions (not MaterializedResult)
image

daft/execution/physical_plan.py Outdated Show resolved Hide resolved
daft/runners/ray_runner.py Outdated Show resolved Hide resolved
@jaychia jaychia enabled auto-merge (squash) November 22, 2023 04:39
@jaychia jaychia merged commit ff218e7 into main Nov 22, 2023
39 checks passed
@jaychia jaychia deleted the jay/remote-len-refactor branch November 22, 2023 04:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants