Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass columns instead of Series to cudf.DataFrame in split-combine workflow #1429

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions python/cuspatial/cuspatial/core/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,25 @@ def _split_out_geometry_columns(

def _recombine_columns(
self, geo_columns: GeoDataFrame, data_columns: cudf.DataFrame
) -> dict[Any, GeoSeries | cudf.Series]:
) -> dict[Any, GeoColumn | cudf.core.column.ColumnBase]:
"""
Combine a GeoDataFrame of only geometry columns with a DataFrame
of non-geometry columns in the same order as the columns in `self`

The output is meant for GeoDataFrame._from_data.
"""
if not geo_columns.index.equals(data_columns.index):
raise ValueError("geo_columns.index must equal data_columns.index")

columns_mask = self.columns
geocolumn_mask = (
isinstance(self[col], GeoSeries) for col in columns_mask
)
col_is_geo = (isinstance(self[col], GeoSeries) for col in columns_mask)
return {
name: (geo_columns[name] if mask else data_columns[name])
for name, mask in zip(columns_mask, geocolumn_mask)
name: (
geo_columns[name]._column
if is_geo
else data_columns[name]._column
)
for name, is_geo in zip(columns_mask, col_is_geo)
}

def _slice(self: T, arg: slice) -> T:
Expand All @@ -184,10 +191,10 @@ def _slice(self: T, arg: slice) -> T:
{name: geo_columns[name].iloc[arg] for name in geo_columns.columns}
)
sliced_data_columns = data_columns._slice(arg)
result = self._recombine_columns(
sliced_geo_columns, sliced_data_columns
return self._from_data(
self._recombine_columns(sliced_geo_columns, sliced_data_columns),
index=sliced_data_columns.index,
)
return self.__class__(result)

def _apply_boolean_mask(self, mask: BooleanMask, keep_index=True) -> T:
geo_columns, data_columns = self._split_out_geometry_columns()
Expand All @@ -197,7 +204,7 @@ def _apply_boolean_mask(self, mask: BooleanMask, keep_index=True) -> T:
{name: geo_columns[name][mask.column] for name in geo_columns}
)

res = self.__class__(self._recombine_columns(geo, data))
res = self._from_data(self._recombine_columns(geo, data))
if keep_index:
res.index = data.index
return res
Expand All @@ -217,12 +224,10 @@ def _gather(self, gather_map: GatherMap, keep_index=True):
geo_gathered = GeoDataFrame(gathered)

# combine
result = GeoDataFrame(
self._recombine_columns(geo_gathered, cudf_gathered)
return GeoDataFrame._from_data(
self._recombine_columns(geo_gathered, cudf_gathered),
index=geo_gathered.index,
)
result.index = geo_gathered.index
# return
return result

def reset_index(
self, level=None, drop=False, inplace=False, col_level=0, col_fill=""
Expand Down Expand Up @@ -270,7 +275,7 @@ def reset_index(
if not drop:
if not isinstance(cudf_data.index, cudf.MultiIndex):
recombiner.insert(
loc=0, name="index", value=cudf_reindexed["index"]
loc=0, column="index", value=cudf_reindexed["index"]
)
# If the index is a MultiIndex, we need to insert the
# individual levels into the GeoDataFrame.
Expand All @@ -288,7 +293,7 @@ def reset_index(
for n, name in enumerate(levels):
recombiner.insert(
loc=n,
name=name,
column=name,
value=cudf_reindexed[name].reset_index(drop=True),
)
recombiner.index = cudf_reindexed.index
Expand All @@ -301,11 +306,10 @@ def reset_index(
# Reset the index of the GeoDataFrame to match the
# cudf DataFrame and recombine.
geo_data.index = cudf_reindexed.index
result = GeoDataFrame(
recombiner._recombine_columns(geo_data, cudf_reindexed)
return GeoDataFrame._from_data(
recombiner._recombine_columns(geo_data, cudf_reindexed),
index=cudf_reindexed.index,
)
result.index = geo_data.index
return result


class _GeoSeriesUtility:
Expand Down
Loading