Skip to content

Commit

Permalink
perf(eda): optimize plot_missing and plot_corr
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonlockhart committed Oct 4, 2020
1 parent ecc0d6c commit b46036d
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 77 deletions.
8 changes: 6 additions & 2 deletions dataprep/eda/correlation/compute/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from .bivariate import _calc_bivariate
from .nullivariate import _calc_nullivariate
from .univariate import _calc_univariate
from ...dtypes import NUMERICAL_DTYPES
from ...utils import to_dask

__all__ = ["compute_correlation"]

Expand All @@ -34,8 +36,10 @@ def compute_correlation(
k
Choose top-k element
"""

df = DataArray(df).select_num_columns()
if x is not None and y is not None:
df = to_dask(df.select_dtypes(NUMERICAL_DTYPES))
else:
df = DataArray(df).select_num_columns()

if x is None and y is None: # pylint: disable=no-else-return
return _calc_nullivariate(df, value_range=value_range, k=k)
Expand Down
86 changes: 31 additions & 55 deletions dataprep/eda/correlation/compute/bivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,27 @@
from typing import Optional, Tuple

import dask
import dask.dataframe as dd
import dask.array as da
import numpy as np
import pandas as pd


from ...data_array import DataArray
from ...intermediate import Intermediate


def _calc_bivariate(
df: DataArray,
x: Optional[str] = None,
y: Optional[str] = None,
*,
k: Optional[int] = None,
df: dd.DataFrame, x: str, y: str, *, k: Optional[int] = None,
) -> Intermediate:
if x not in df.columns:
raise ValueError(f"{x} not in columns names")
if y not in df.columns:
raise ValueError(f"{y} not in columns names")

xname, yname = x, y

df.compute()

xloc = df.columns.get_loc(x)
yloc = df.columns.get_loc(y)
df = df[[x, y]].dropna()
coeffs, df_smp, influences = scatter_with_regression(df, sample_size=1000, k=k)

x = df.values[:, xloc]
y = df.values[:, yloc]
coeffs, (x, y), influences = scatter_with_regression(x, y, k=k, sample_size=1000,)
coeffs, df_smp, influences = dask.compute(coeffs, df_smp, influences)

coeffs, (x, y), influences = dask.compute(coeffs, (x, y), influences)

# lazy/eager border line
result = {
"coeffs": coeffs,
"data": pd.DataFrame({xname: x, yname: y}),
}
result = {"coeffs": coeffs, "data": df_smp}

if (influences is None) != (k is None):
raise RuntimeError("Not possible")
Expand All @@ -55,51 +37,45 @@ def _calc_bivariate(
labels[infidx[-k:]] = "-" # type: ignore
# pylint: enable=invalid-unary-operand-type
labels[infidx[:k]] = "+"
result["data"]["influence"] = labels
result["data"]["influence"] = labels # type: ignore

return Intermediate(**result, visual_type="correlation_scatter")


def scatter_with_regression(
x: da.Array, y: da.Array, sample_size: int, k: Optional[int] = None
df: dd.DataFrame, sample_size: int, k: Optional[int] = None
) -> Tuple[Tuple[da.Array, da.Array], Tuple[da.Array, da.Array], Optional[da.Array]]:
"""Calculate pearson correlation on 2 given arrays.
Parameters
----------
xarr : da.Array
yarr : da.Array
sample_size : int
df
dataframe
sample_size
Number of points to show in the scatter plot
k : Optional[int] = None
Highlight k points which influence pearson correlation most
"""
if k == 0:
raise ValueError("k should be larger than 0")

xp1 = da.vstack([x, da.ones_like(x)]).T
xp1 = xp1.rechunk((xp1.chunks[0], -1))

mask = ~(da.isnan(x) | da.isnan(y))
# if chunk size in the first dimension is 1, lstsq will use sfqr instead of tsqr,
# where the former does not support nan in shape.

if len(xp1.chunks[0]) == 1:
xp1 = xp1.rechunk((2, -1))
y = y.rechunk((2, -1))
mask = mask.rechunk((2, -1))

(coeffa, coeffb), _, _, _ = da.linalg.lstsq(xp1[mask], y[mask])

if sample_size < x.shape[0]:
samplesel = da.random.choice(x.shape[0], int(sample_size), chunks=x.chunksize)
x = x[samplesel]
y = y[samplesel]

if k is None:
return (coeffa, coeffb), (x, y), None
df["ones"] = 1
arr = df.to_dask_array(lengths=True)

(coeffa, coeffb), _, _, _ = da.linalg.lstsq(arr[:, [0, 2]], arr[:, 1])

df = df.drop(columns=["ones"])
df_smp = df.map_partitions(
lambda x: x.sample(min(sample_size, x.shape[0])), meta=df
)
# TODO influences should not be computed on a sample
influences = (
pearson_influence(
df_smp[df.columns[0]].to_dask_array(lengths=True),
df_smp[df.columns[1]].to_dask_array(lengths=True),
)
if k
else None
)

influences = pearson_influence(x, y)
return (coeffa, coeffb), (x, y), influences
return (coeffa, coeffb), df_smp, influences


def pearson_influence(xarr: da.Array, yarr: da.Array) -> da.Array:
Expand Down
3 changes: 2 additions & 1 deletion dataprep/eda/distribution/compute/bivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def compute_bivariate(
visual_type="two_cat_cols",
)
elif is_dtype(xtype, Continuous()) and is_dtype(ytype, Continuous()):
df = df[[x, y]].dropna()
# one partition required for apply(pd.cut) in calc_box_num
df = df[[x, y]].dropna().repartition(npartitions=1)

data: Dict[str, Any] = {}
# scatter plot data
Expand Down
6 changes: 4 additions & 2 deletions dataprep/eda/distribution/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ def box_viz(
plot_height=plot_height,
title=title,
toolbar_location=None,
x_range=list(df["grp"]),
x_range=list(df["grp"].astype(str)),
)
low = fig.segment(x0="x0", y0="lw", x1="x1", y1="lw", line_color="black", source=df)
ltail = fig.segment(
Expand Down Expand Up @@ -807,6 +807,7 @@ def line_viz(
# pylint: disable=too-many-arguments,too-many-locals
palette = CATEGORY20 * (len(df) // len(CATEGORY20) + 1)
title = _make_title({f"{x}_ttl": ttl_grps, f"{x}_shw": len(df)}, x, y)
df.index = df.index.astype(str)

fig = figure(
plot_height=plot_height,
Expand Down Expand Up @@ -1525,7 +1526,7 @@ def nom_insights(data: Dict[str, Any], col: str) -> Dict[str, List[str]]:

## if cfg.insight.attribution_enable
if data["pie"][:2].sum() / data["nrows"] > 0.5 and len(data["pie"]) >= 2:
vals = ", ".join(data["pie"].index[i] for i in range(2))
vals = ", ".join(str(data["pie"].index[i]) for i in range(2))
ins["Pie Chart"].append(f"The top 2 categories ({vals}) take over 50%")

## if cfg.insight.high_word_cardinlaity_enable
Expand Down Expand Up @@ -1768,6 +1769,7 @@ def render_two_cat(itmdt: Intermediate, plot_width: int, plot_height: int,) -> T
y_lrgst = ygrps.nlargest(itmdt["nsubgroups"])
df = df[df[y].isin(y_lrgst.index)]
stats.update(zip((f"{y}_ttl", f"{y}_shw"), (len(ygrps), len(y_lrgst))))
df[[x, y]] = df[[x, y]].astype(str)

# final format
df = df.pivot_table(index=y, columns=x, values="cnt", fill_value=0, aggfunc="sum")
Expand Down
27 changes: 27 additions & 0 deletions dataprep/eda/missing/compute/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@
LABELS = ["With Missing", "Missing Dropped"]


def uni_histogram(
srs: dd.Series, bins: int, dtype: Optional[DTypeDef] = None,
) -> Tuple[da.Array, ...]:
"""Calculate "histogram" for both numerical and categorical."""

if is_dtype(detect_dtype(srs, dtype), Continuous()):

counts, edges = da.histogram(srs, bins, range=[srs.min(), srs.max()])
centers = (edges[:-1] + edges[1:]) / 2

return counts, centers, edges

elif is_dtype(detect_dtype(srs, dtype), Nominal()):
# Dask array's unique is way slower than the values_counts on Series
# See https://github.com/dask/dask/issues/2851
# centers, counts = da.unique(arr, return_counts=True)

value_counts = srs.value_counts()

counts = value_counts.to_dask_array()
centers = value_counts.index.to_dask_array()

return (counts, centers)
else:
raise ValueError(f"Unsupported dtype {srs.dtype}")


def histogram(
arr: da.Array,
bins: Optional[int] = None,
Expand Down
27 changes: 10 additions & 17 deletions dataprep/eda/missing/compute/univariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,36 @@

from ...data_array import DataArray
from ...dtypes import (
Continuous,
DTypeDef,
Nominal,
detect_dtype,
is_dtype,
)
from ...intermediate import ColumnsMetadata, Intermediate
from ...staged import staged
from .common import LABELS, histogram
from .common import LABELS, uni_histogram


def _compute_missing_univariate( # pylint: disable=too-many-locals
df: DataArray, x: str, bins: int, dtype: Optional[DTypeDef] = None,
) -> Generator[Any, Any, Intermediate]:
"""Calculate the distribution change on other columns when
the missing values in x is dropped."""
j = df.columns.get_loc(x)

# dataframe with all rows where column x is null removed
ddf = df.frame[~df.frame[x].isna()]

hists = {}

for i in range(len(df.columns)):
if i == j:
for col in df.columns:
if col == x:
continue
col_name = df.columns[i]

col0 = df.values[~df.nulls[:, i], i].astype(df.dtypes[col_name])
col1 = df.values[~(df.nulls[:, j] | df.nulls[:, i]), i].astype(
df.dtypes[col_name]
)

hist_range = None # pylint: disable=redefined-builtin
if is_dtype(detect_dtype(col0, dtype), Continuous()):
hist_range = (col0.min(axis=0), col0.max(axis=0))
srs0 = df.frame[col].dropna() # series from original dataframe
srs1 = ddf[col].dropna() # series with null rows from col x removed

hists[col_name] = [
histogram(col, dtype=dtype, bins=bins, return_edges=True, range=hist_range)
for col in [col0, col1]
hists[col] = [
uni_histogram(srs, bins=bins, dtype=dtype) for srs in [srs0, srs1]
]

### Lazy Region End
Expand Down

0 comments on commit b46036d

Please sign in to comment.