-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Pivot implementation #1645
[FEATURE] Pivot implementation #1645
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1645 +/- ##
==========================================
+ Coverage 81.88% 81.93% +0.05%
==========================================
Files 79 79
Lines 9353 9379 +26
==========================================
+ Hits 7659 7685 +26
Misses 1694 1694
Continue to review full report at Codecov.
|
641adc2
to
408dbbd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to move the logic in dataframe.py
to the _query_compiler
. Some implementations have an optimized pivot
operation, so we will want the control to in the query_compiler
layer.
At |
Ok @dchigarev I will place a blocked tag on this PR. Feel free to remove it once #1649 is merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, this is quite clean and should perform very well.
@devin-petersohn I've add some fixes into But there is some mismatch from pandas in this implementation. If our specified |
@dchigarev pandas 1.1 (August 1) will have a new feature added in pandas-dev/pandas#30584 to keep or drop |
@devin-petersohn I've finished implementing
|
dee328b
to
a1c6c90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran some timings to see how this implementation performs, and I noticed that utilization is very high (good), but overall time is a bit slow.
On 30,000 rows 3 columns, pandas takes 166 ms, this PR takes >30s. The performance is highly dependent on the number of unique values in the index
column. With this case, defaulting to pandas takes 250ms.
Did you run performance on this yourself? I would not expect this performance seeing the implementation and knowing how groupby is implemented.
@devin-petersohn Yeah, the problem seems to appear with increasing amount of Reduce part in (please note if this method of measurement is incorrect) groupby_reduce @classmethod
def groupby_reduce(
cls, axis, partitions, by, map_func, reduce_func
): # pragma: no cover
from timeit import default_timer as timer
t1 = timer()
map_func = ray.put(map_func)
by_parts = np.squeeze(by)
if len(by_parts.shape) == 0:
by_parts = np.array([by_parts.item()])
print("GRP_preparations:", timer() - t1)
t1 = timer()
new_partitions = np.array(
[
[
PandasOnRayFramePartition(
func.remote(
part.oid,
by_parts[col_idx].oid if axis else by_parts[row_idx].oid,
map_func,
part.call_queue,
by_parts[col_idx].call_queue
if axis
else by_parts[row_idx].call_queue,
)
)
for col_idx, part in enumerate(partitions[row_idx])
]
for row_idx in range(len(partitions))
]
)
[y.get() for x in new_partitions for y in x]
print("GRP_map:", timer() - t1)
t1 = timer()
res = cls.map_axis_partitions(axis, new_partitions, reduce_func)
[y.get() for x in res for y in x]
print("GRP_reduce:", timer() - t1)
return res And I've got this result for
And if you'll look at CPU usage at the reduce part, probably only one core would be used. Let's look at two corner cases:
Let's say we have
Adding this line before reduce part to trigger resplitting at rows axis, gives increase of performance in 5x times new_partitions = cls.map_axis_partitions(1, new_partitions, lambda df: df) Execution time for
It seems that groupby + apply for high dimension data is pretty hard for pandas, for example for by = pd_df["_index"]
pd_df.drop(columns=["_index", "_columns"]).groupby(by).apply(lambda df: df) So I think unsplitted data passed to the reduce function is a problem here. P.S. Also tested the previous implementation of
766cc14 still has places to optimize and requires refactoring, this uses draft |
1995ae4
to
fdbaaab
Compare
@devin-petersohn I've refactored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchigarev It looks reasonable using unstack
here, I will leave comments about unstack
implementation on #1649.
a523f1b
to
ef58744
Compare
@devin-petersohn since |
ef58744
to
0b64517
Compare
0b64517
to
ef9ff37
Compare
Signed-off-by: Dmitry Chigarev <[email protected]>
ef9ff37
to
668987a
Compare
@devin-petersohn since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance of this will not be very fast for this first implementation, but we will tune stack
and unstack
when #1975 is resolved.
…ect#1645) Signed-off-by: Dmitry Chigarev <[email protected]>
What do these changes do?
flake8 modin
black --check modin
git commit -s
That implementation uses
unstask
function that falls default to pandas in current master, but after #1649 will be merged it will not