Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Index constructor, copy, get_level_values, to_series #334

Merged
merged 10 commits into from
Jan 26, 2024
6 changes: 2 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,14 @@ def reset_index(self, drop: bool = True) -> Block:
A new Block because dropping index columns can break references
from Index classes that point to this block.
"""
block = self
new_index_col_id = guid.generate_guid()
expr = self._expr.promote_offsets(new_index_col_id)
if drop:
# Even though the index might be part of the ordering, keep that
# ordering expression as reset_index shouldn't change the row
# order.
expr = expr.drop_columns(self.index_columns)
block = Block(
return Block(
expr,
index_columns=[new_index_col_id],
column_labels=self.column_labels,
Expand All @@ -321,13 +320,12 @@ def reset_index(self, drop: bool = True) -> Block:
# See: https://pandas.pydata.org/docs/reference/api/pandas.Index.insert.html
column_labels_modified = column_labels_modified.insert(level, label)

block = Block(
return Block(
expr,
index_columns=[new_index_col_id],
column_labels=column_labels_modified,
index_labels=[None],
)
return block

def set_index(
self,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _loc_getitem_series_or_dataframe(
keys_df = keys_df.set_index(temp_name, drop=True)
return _perform_loc_list_join(series_or_dataframe, keys_df)
elif isinstance(key, bigframes.core.indexes.Index):
block = key._data._get_block()
block = key._block
block = block.select_columns(())
keys_df = bigframes.dataframe.DataFrame(block)
return _perform_loc_list_join(series_or_dataframe, keys_df)
Expand Down
183 changes: 146 additions & 37 deletions bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from __future__ import annotations

import typing
from typing import Mapping, Sequence, Tuple, Union
from typing import Hashable, Mapping, Optional, Sequence, Tuple, Union

import google.cloud.bigquery as bigquery
import numpy as np
import pandas

Expand All @@ -33,16 +34,60 @@
import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.dtypes as bf_dtypes
import bigframes.formatting_helpers as formatter
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import third_party.bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index

if typing.TYPE_CHECKING:
import bigframes.dataframe
import bigframes.series


class Index(vendored_pandas_index.Index):
__doc__ = vendored_pandas_index.Index.__doc__

def __init__(self, data: blocks.BlockHolder):
self._data = data
def __init__(
self,
data=None,
dtype=None,
*,
name=None,
):
import bigframes.dataframe as df
import bigframes.series as series
Copy link
Collaborator

@Genesis929 Genesis929 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit curious why import modules inside a method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its to avoids a circular dependency. DataFrame.py and Series.py already depend on Index.py, so cannot have top-level import of those from Index.py.


if isinstance(data, blocks.Block):
block = data.select_columns([])
elif isinstance(data, df.DataFrame):
raise ValueError("Cannot construct index from dataframe.")
elif isinstance(data, series.Series) or isinstance(data, Index):
if isinstance(data, series.Series):
block = data._block
block = block.set_index(
col_ids=[data._value_column],
)
elif isinstance(data, Index):
block = data._block
index = Index(data=block)
name = data.name if name is None else name
if name is not None:
index.name = name
if dtype is not None:
index = index.astype(dtype)
block = index._block
else:
pd_index = pandas.Index(data=data, dtype=dtype, name=name)
pd_df = pandas.DataFrame(index=pd_index)
block = df.DataFrame(pd_df)._block
self._query_job = None
self._block: blocks.Block = block

@classmethod
def from_frame(
cls, frame: Union[bigframes.series.Series, bigframes.dataframe.DataFrame]
) -> Index:
return FrameIndex(frame)

@property
def name(self) -> blocks.Label:
Expand All @@ -55,15 +100,16 @@ def name(self, value: blocks.Label):
@property
def names(self) -> typing.Sequence[blocks.Label]:
"""Returns the names of the Index."""
return self._data._get_block()._index_labels
return self._block._index_labels

@names.setter
def names(self, values: typing.Sequence[blocks.Label]):
return self._data._set_block(self._block.with_index_labels(values))
new_block = self._block.with_index_labels(values)
self._block = new_block

@property
def nlevels(self) -> int:
return len(self._data._get_block().index_columns)
return len(self._block.index_columns)

@property
def values(self) -> np.ndarray:
Expand All @@ -75,7 +121,7 @@ def ndim(self) -> int:

@property
def shape(self) -> typing.Tuple[int]:
return (self._data._get_block().shape[0],)
return (self._block.shape[0],)

@property
def dtype(self):
Expand Down Expand Up @@ -107,9 +153,7 @@ def is_monotonic_increasing(self) -> bool:
"""
return typing.cast(
bool,
self._data._get_block().is_monotonic_increasing(
self._data._get_block().index_columns
),
self._block.is_monotonic_increasing(self._block.index_columns),
)

@property
Expand All @@ -122,9 +166,7 @@ def is_monotonic_decreasing(self) -> bool:
"""
return typing.cast(
bool,
self._data._get_block().is_monotonic_decreasing(
self._data._get_block().index_columns
),
self._block.is_monotonic_decreasing(self._block.index_columns),
)

@property
Expand All @@ -149,14 +191,65 @@ def has_duplicates(self) -> bool:
duplicates_df = df.DataFrame(duplicates_block)
return duplicates_df["is_duplicate"].any()

@property
def _block(self) -> blocks.Block:
return self._data._get_block()

@property
def T(self) -> Index:
return self.transpose()

@property
def query_job(self) -> Optional[bigquery.QueryJob]:
"""BigQuery job metadata for the most recent query.

Returns:
The most recent `QueryJob
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
"""
if self._query_job is None:
self._query_job = self._block._compute_dry_run()
return self._query_job

def __repr__(self) -> str:
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole series by using job
# metadata, like we do with DataFrame.
opts = bigframes.options.display
max_results = opts.max_rows
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self.query_job)

pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
self._query_job = query_job
return repr(pandas_df.index)

def copy(self, name: Optional[Hashable] = None):
copy_index = Index(self._block)
if name is not None:
copy_index.name = name
return copy_index

def to_series(
self, index: Optional[Index] = None, name: Optional[Hashable] = None
) -> bigframes.series.Series:
if self.nlevels != 1:
NotImplementedError(
f"Converting multi-index to series is not yet supported. {constants.FEEDBACK_LINK}"
)

import bigframes.series

name = self.name if name is None else name
if index is None:
return bigframes.series.Series(data=self, index=self, name=name)
else:
return bigframes.series.Series(data=self, index=Index(index), name=name)

def get_level_values(self, level) -> Index:
level_n = level if isinstance(level, int) else self.names.index(level)
block = self._block.drop_levels(
[self._block.index_columns[i] for i in range(self.nlevels) if i != level_n]
)
return Index(block)

def _memory_usage(self) -> int:
(n_rows,) = self.shape
return sum(
Expand All @@ -180,7 +273,7 @@ def sort_values(self, *, ascending: bool = True, na_position: str = "last"):
order.OrderingColumnReference(column, direction=direction, na_last=na_last)
for column in index_columns
]
return Index._from_block(self._block.order_by(ordering))
return Index(self._block.order_by(ordering))

def astype(
self,
Expand Down Expand Up @@ -269,7 +362,7 @@ def rename(self, name: Union[str, Sequence[str]]) -> Index:
names = [name] if isinstance(name, str) else list(name)
if len(names) != self.nlevels:
raise ValueError("'name' must be same length as levels")
return Index._from_block(self._block.with_index_labels(names))
return Index(self._block.with_index_labels(names))

def drop(
self,
Expand All @@ -291,17 +384,17 @@ def drop(
)
block = block.filter(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
return Index._from_block(block)
return Index(block)

def dropna(self, how: str = "any") -> Index:
if how not in ("any", "all"):
raise ValueError("'how' must be one of 'any', 'all'")
result = block_ops.dropna(self._block, self._block.index_columns, how=how) # type: ignore
return Index._from_block(result)
return Index(result)

def drop_duplicates(self, *, keep: str = "first") -> Index:
block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep)
return Index._from_block(block)
return Index(block)

def isin(self, values) -> Index:
if not utils.is_list_like(values):
Expand Down Expand Up @@ -330,7 +423,7 @@ def _apply_unary_expr(
result_ids.append(result_id)

block = block.set_index(result_ids, index_labels=self._block.index_labels)
return Index._from_block(block)
return Index(block)

def _apply_aggregation(self, op: agg_ops.AggregateOp) -> typing.Any:
if self.nlevels > 1:
Expand All @@ -344,7 +437,7 @@ def __getitem__(self, key: int) -> typing.Any:
result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas()
else: # special case, want [-1:] instead of [-1:0]
result_pd_df, _ = self._block.slice(key).to_pandas()
if result_pd_df.empty:
if result_pd_df.index.empty:
raise IndexError("single positional indexer is out-of-bounds")
return result_pd_df.index[0]
else:
Expand All @@ -367,11 +460,36 @@ def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
def __len__(self):
return self.shape[0]

@classmethod
def _from_block(cls, block: blocks.Block) -> Index:
import bigframes.dataframe as df

return Index(df.DataFrame(block))
# Index that mutates the originating dataframe/series
class FrameIndex(Index):
def __init__(
self,
series_or_dataframe: typing.Union[
bigframes.series.Series, bigframes.dataframe.DataFrame
],
):
super().__init__(series_or_dataframe._block)
self._whole_frame = series_or_dataframe

@property
def name(self) -> blocks.Label:
return self.names[0]

@name.setter
def name(self, value: blocks.Label):
self.names = [value]

@property
def names(self) -> typing.Sequence[blocks.Label]:
"""Returns the names of the Index."""
return self._block._index_labels

@names.setter
def names(self, values: typing.Sequence[blocks.Label]):
new_block = self._whole_frame._get_block().with_index_labels(values)
self._whole_frame._set_block(new_block)
self._block = new_block


class IndexValue:
Expand Down Expand Up @@ -406,15 +524,6 @@ def dtypes(
def session(self) -> core.Session:
return self._expr.session

def __repr__(self) -> str:
"""Converts an Index to a string."""
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole index by using job
# metadata, like we do with DataFrame.
preview = self.to_pandas()
return repr(preview)

def to_pandas(self) -> pandas.Index:
"""Executes deferred operations and downloads the results."""
# Project down to only the index column. So the query can be cached to visualize other data.
Expand Down
Loading