Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DataFrame.apply #1259

Merged
merged 3 commits into from
Feb 11, 2020
Merged

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Feb 7, 2020

This PR proposes to implement DataFrame.apply with both axis 0 and 1. Note that, DataFrame.apply(..., axis=0) with global aggregations is impossible.

It can be tested with the examples below:

import numpy as np
import databricks.koalas as ks

df = ks.DataFrame([[4, 9]] * 10, columns=['A', 'B'])

df.apply(np.sqrt, axis=0)

def sqrt(x) -> ks.Series[float]:
    return np.sqrt(x)
df.apply(sqrt, axis=0)


df.apply(np.sum, axis=1)

def summation(x) -> int:
   return np.sum(x)

df.apply(summation, axis=1)

Basically the approach is using group map Pandas UDF by grouping by partitions.

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def func(pdf):
    return pdf.apply(...)

df.groupby(F.spark_partition_id()).apply(func).show()

Resolves #1228
Resolves #65

@patryk-oleniuk
Copy link
Contributor

patryk-oleniuk commented Feb 7, 2020

Great!

Note that, DataFrame.apply(..., axis=1) with global aggregations is impossible.
Could you tell what do you exactly mean by that?

Is access to each element of the row available with axis=1?
Can I do this?

df = ks.DataFrame([[4, 9]] * 10, columns=['param1', 'param2'])

def my_func( x ) -> int:
  # x should be a row here 
  return run_regression_and_log_to_mlflow(x['param1'], x['param2'])

#should return a series of ints
df.apply(my_func, axis=1) 

Thanks!

@HyukjinKwon
Copy link
Member Author

@patryk-oleniuk, yup, that works:

df = ks.DataFrame([[4, 9]] * 10, columns=['param1', 'param2'])

def my_func( x ) -> int:
  # x should be a row here 
  print(x)
  return 1

#should return a series of ints
df.apply(my_func, axis=1) 
param1    4
param2    9
Name: 5, dtype: int64
param1    4
param2    9
Name: 5, dtype: int64
...
0    1
1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
9    1
Name: 0, dtype: int32

@codecov-io
Copy link

codecov-io commented Feb 10, 2020

Codecov Report

Merging #1259 into master will decrease coverage by 1.38%.
The diff coverage is 93.44%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1259      +/-   ##
==========================================
- Coverage   95.16%   93.77%   -1.39%     
==========================================
  Files          35       35              
  Lines        7151     7200      +49     
==========================================
- Hits         6805     6752      -53     
- Misses        346      448     +102
Impacted Files Coverage Δ
databricks/koalas/missing/frame.py 100% <ø> (ø) ⬆️
databricks/koalas/groupby.py 91.43% <100%> (ø) ⬆️
databricks/koalas/frame.py 96.68% <92%> (-0.14%) ⬇️
databricks/koalas/usage_logging/__init__.py 24.32% <0%> (-72.98%) ⬇️
databricks/koalas/usage_logging/usage_logger.py 50% <0%> (-50%) ⬇️
databricks/koalas/__init__.py 78.72% <0%> (-6.39%) ⬇️
databricks/conftest.py 92.45% <0%> (-3.78%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d1a8229...12e47c7. Read the comment docs.

Copy link
Collaborator

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@HyukjinKwon
Copy link
Member Author

Thanks, @ueshin. Merged!

@HyukjinKwon HyukjinKwon merged commit 54b9ae0 into databricks:master Feb 11, 2020
@HyukjinKwon HyukjinKwon deleted the dataframe.apply branch September 11, 2020 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DataFrame.apply(func, axis=1) UDF using DataFrame.apply
4 participants