Skip to content

Commit

Permalink
feat: Add Index constructor, repr, copy, get_level_values, to_series (#…
Browse files Browse the repository at this point in the history
…334)

* feat: Add Index constructor, copy, get_level_values, to_series

fix mypy error

* fix constructor bug

* fix error with index name mutation

* refactor index to make mutation clearer

* fix index bugs

* give index custom repr

---------

Co-authored-by: Huan Chen <[email protected]>
  • Loading branch information
TrevorBergeron and Genesis929 authored Jan 26, 2024
1 parent 677f014 commit e5d054e
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 76 deletions.
6 changes: 2 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,14 @@ def reset_index(self, drop: bool = True) -> Block:
A new Block because dropping index columns can break references
from Index classes that point to this block.
"""
block = self
new_index_col_id = guid.generate_guid()
expr = self._expr.promote_offsets(new_index_col_id)
if drop:
# Even though the index might be part of the ordering, keep that
# ordering expression as reset_index shouldn't change the row
# order.
expr = expr.drop_columns(self.index_columns)
block = Block(
return Block(
expr,
index_columns=[new_index_col_id],
column_labels=self.column_labels,
Expand All @@ -321,13 +320,12 @@ def reset_index(self, drop: bool = True) -> Block:
# See: https://pandas.pydata.org/docs/reference/api/pandas.Index.insert.html
column_labels_modified = column_labels_modified.insert(level, label)

block = Block(
return Block(
expr,
index_columns=[new_index_col_id],
column_labels=column_labels_modified,
index_labels=[None],
)
return block

def set_index(
self,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _loc_getitem_series_or_dataframe(
keys_df = keys_df.set_index(temp_name, drop=True)
return _perform_loc_list_join(series_or_dataframe, keys_df)
elif isinstance(key, bigframes.core.indexes.Index):
block = key._data._get_block()
block = key._block
block = block.select_columns(())
keys_df = bigframes.dataframe.DataFrame(block)
return _perform_loc_list_join(series_or_dataframe, keys_df)
Expand Down
183 changes: 146 additions & 37 deletions bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from __future__ import annotations

import typing
from typing import Mapping, Sequence, Tuple, Union
from typing import Hashable, Mapping, Optional, Sequence, Tuple, Union

import google.cloud.bigquery as bigquery
import numpy as np
import pandas

Expand All @@ -33,16 +34,60 @@
import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.dtypes as bf_dtypes
import bigframes.formatting_helpers as formatter
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import third_party.bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index

if typing.TYPE_CHECKING:
import bigframes.dataframe
import bigframes.series


class Index(vendored_pandas_index.Index):
__doc__ = vendored_pandas_index.Index.__doc__

def __init__(self, data: blocks.BlockHolder):
self._data = data
def __init__(
self,
data=None,
dtype=None,
*,
name=None,
):
import bigframes.dataframe as df
import bigframes.series as series

if isinstance(data, blocks.Block):
block = data.select_columns([])
elif isinstance(data, df.DataFrame):
raise ValueError("Cannot construct index from dataframe.")
elif isinstance(data, series.Series) or isinstance(data, Index):
if isinstance(data, series.Series):
block = data._block
block = block.set_index(
col_ids=[data._value_column],
)
elif isinstance(data, Index):
block = data._block
index = Index(data=block)
name = data.name if name is None else name
if name is not None:
index.name = name
if dtype is not None:
index = index.astype(dtype)
block = index._block
else:
pd_index = pandas.Index(data=data, dtype=dtype, name=name)
pd_df = pandas.DataFrame(index=pd_index)
block = df.DataFrame(pd_df)._block
self._query_job = None
self._block: blocks.Block = block

@classmethod
def from_frame(
cls, frame: Union[bigframes.series.Series, bigframes.dataframe.DataFrame]
) -> Index:
return FrameIndex(frame)

@property
def name(self) -> blocks.Label:
Expand All @@ -55,15 +100,16 @@ def name(self, value: blocks.Label):
@property
def names(self) -> typing.Sequence[blocks.Label]:
"""Returns the names of the Index."""
return self._data._get_block()._index_labels
return self._block._index_labels

@names.setter
def names(self, values: typing.Sequence[blocks.Label]):
return self._data._set_block(self._block.with_index_labels(values))
new_block = self._block.with_index_labels(values)
self._block = new_block

@property
def nlevels(self) -> int:
return len(self._data._get_block().index_columns)
return len(self._block.index_columns)

@property
def values(self) -> np.ndarray:
Expand All @@ -75,7 +121,7 @@ def ndim(self) -> int:

@property
def shape(self) -> typing.Tuple[int]:
return (self._data._get_block().shape[0],)
return (self._block.shape[0],)

@property
def dtype(self):
Expand Down Expand Up @@ -107,9 +153,7 @@ def is_monotonic_increasing(self) -> bool:
"""
return typing.cast(
bool,
self._data._get_block().is_monotonic_increasing(
self._data._get_block().index_columns
),
self._block.is_monotonic_increasing(self._block.index_columns),
)

@property
Expand All @@ -122,9 +166,7 @@ def is_monotonic_decreasing(self) -> bool:
"""
return typing.cast(
bool,
self._data._get_block().is_monotonic_decreasing(
self._data._get_block().index_columns
),
self._block.is_monotonic_decreasing(self._block.index_columns),
)

@property
Expand All @@ -149,14 +191,65 @@ def has_duplicates(self) -> bool:
duplicates_df = df.DataFrame(duplicates_block)
return duplicates_df["is_duplicate"].any()

@property
def _block(self) -> blocks.Block:
return self._data._get_block()

@property
def T(self) -> Index:
return self.transpose()

@property
def query_job(self) -> Optional[bigquery.QueryJob]:
"""BigQuery job metadata for the most recent query.
Returns:
The most recent `QueryJob
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
"""
if self._query_job is None:
self._query_job = self._block._compute_dry_run()
return self._query_job

def __repr__(self) -> str:
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole series by using job
# metadata, like we do with DataFrame.
opts = bigframes.options.display
max_results = opts.max_rows
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self.query_job)

pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
self._query_job = query_job
return repr(pandas_df.index)

def copy(self, name: Optional[Hashable] = None):
copy_index = Index(self._block)
if name is not None:
copy_index.name = name
return copy_index

def to_series(
self, index: Optional[Index] = None, name: Optional[Hashable] = None
) -> bigframes.series.Series:
if self.nlevels != 1:
NotImplementedError(
f"Converting multi-index to series is not yet supported. {constants.FEEDBACK_LINK}"
)

import bigframes.series

name = self.name if name is None else name
if index is None:
return bigframes.series.Series(data=self, index=self, name=name)
else:
return bigframes.series.Series(data=self, index=Index(index), name=name)

def get_level_values(self, level) -> Index:
level_n = level if isinstance(level, int) else self.names.index(level)
block = self._block.drop_levels(
[self._block.index_columns[i] for i in range(self.nlevels) if i != level_n]
)
return Index(block)

def _memory_usage(self) -> int:
(n_rows,) = self.shape
return sum(
Expand All @@ -180,7 +273,7 @@ def sort_values(self, *, ascending: bool = True, na_position: str = "last"):
order.OrderingColumnReference(column, direction=direction, na_last=na_last)
for column in index_columns
]
return Index._from_block(self._block.order_by(ordering))
return Index(self._block.order_by(ordering))

def astype(
self,
Expand Down Expand Up @@ -269,7 +362,7 @@ def rename(self, name: Union[str, Sequence[str]]) -> Index:
names = [name] if isinstance(name, str) else list(name)
if len(names) != self.nlevels:
raise ValueError("'name' must be same length as levels")
return Index._from_block(self._block.with_index_labels(names))
return Index(self._block.with_index_labels(names))

def drop(
self,
Expand All @@ -291,17 +384,17 @@ def drop(
)
block = block.filter(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
return Index._from_block(block)
return Index(block)

def dropna(self, how: str = "any") -> Index:
if how not in ("any", "all"):
raise ValueError("'how' must be one of 'any', 'all'")
result = block_ops.dropna(self._block, self._block.index_columns, how=how) # type: ignore
return Index._from_block(result)
return Index(result)

def drop_duplicates(self, *, keep: str = "first") -> Index:
block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep)
return Index._from_block(block)
return Index(block)

def isin(self, values) -> Index:
if not utils.is_list_like(values):
Expand Down Expand Up @@ -330,7 +423,7 @@ def _apply_unary_expr(
result_ids.append(result_id)

block = block.set_index(result_ids, index_labels=self._block.index_labels)
return Index._from_block(block)
return Index(block)

def _apply_aggregation(self, op: agg_ops.AggregateOp) -> typing.Any:
if self.nlevels > 1:
Expand All @@ -344,7 +437,7 @@ def __getitem__(self, key: int) -> typing.Any:
result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas()
else: # special case, want [-1:] instead of [-1:0]
result_pd_df, _ = self._block.slice(key).to_pandas()
if result_pd_df.empty:
if result_pd_df.index.empty:
raise IndexError("single positional indexer is out-of-bounds")
return result_pd_df.index[0]
else:
Expand All @@ -367,11 +460,36 @@ def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
def __len__(self):
return self.shape[0]

@classmethod
def _from_block(cls, block: blocks.Block) -> Index:
import bigframes.dataframe as df

return Index(df.DataFrame(block))
# Index that mutates the originating dataframe/series
class FrameIndex(Index):
def __init__(
self,
series_or_dataframe: typing.Union[
bigframes.series.Series, bigframes.dataframe.DataFrame
],
):
super().__init__(series_or_dataframe._block)
self._whole_frame = series_or_dataframe

@property
def name(self) -> blocks.Label:
return self.names[0]

@name.setter
def name(self, value: blocks.Label):
self.names = [value]

@property
def names(self) -> typing.Sequence[blocks.Label]:
"""Returns the names of the Index."""
return self._block._index_labels

@names.setter
def names(self, values: typing.Sequence[blocks.Label]):
new_block = self._whole_frame._get_block().with_index_labels(values)
self._whole_frame._set_block(new_block)
self._block = new_block


class IndexValue:
Expand Down Expand Up @@ -406,15 +524,6 @@ def dtypes(
def session(self) -> core.Session:
return self._expr.session

def __repr__(self) -> str:
"""Converts an Index to a string."""
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole index by using job
# metadata, like we do with DataFrame.
preview = self.to_pandas()
return repr(preview)

def to_pandas(self) -> pandas.Index:
"""Executes deferred operations and downloads the results."""
# Project down to only the index column. So the query can be cached to visualize other data.
Expand Down
Loading

0 comments on commit e5d054e

Please sign in to comment.