Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add TableDefinition wrapper for python #5892

Merged
merged 22 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions py/server/deephaven/_table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy as np
from deephaven import update_graph

from deephaven.column import Column
from deephaven.column import ColumnDefinition
from deephaven.jcompat import to_sequence
from deephaven.numpy import _column_to_numpy_array
from deephaven.table import Table
Expand All @@ -18,7 +18,7 @@

T = TypeVar('T')

def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]:
def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[ColumnDefinition]:
if not cols:
col_defs = table.columns
else:
Expand All @@ -31,7 +31,7 @@ def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]


def _table_reader_all(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
emitter: Callable[[Sequence[Column], jpy.JType], T], row_set: jpy.JType,
emitter: Callable[[Sequence[ColumnDefinition], jpy.JType], T], row_set: jpy.JType,
prev: bool = False) -> T:
""" Reads all the rows in the given row set of a table. The emitter converts the Java data into a desired Python
object.
Expand Down Expand Up @@ -103,7 +103,7 @@ def _table_reader_all_dict(table: Table, cols: Optional[Union[str, Sequence[str]


def _table_reader_chunk(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
emitter: Callable[[Sequence[Column], jpy.JType], Iterable[T]], row_set: jpy.JType,
emitter: Callable[[Sequence[ColumnDefinition], jpy.JType], Iterable[T]], row_set: jpy.JType,
chunk_size: int = 2048, prev: bool = False) \
-> Generator[T, None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table. The emitter converts the Java chunk
Expand Down Expand Up @@ -178,7 +178,7 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st
Raises:
ValueError
"""
def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Dict[str, np.ndarray], None, None]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Generator[Dict[str, np.ndarray], None, None]:
yield {col_def.name: _column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)}

return _table_reader_chunk(table, cols, emitter=_emitter, row_set=row_set, chunk_size=chunk_size, prev=prev)
Expand Down Expand Up @@ -210,9 +210,9 @@ def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[s
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)
named_tuple_class = namedtuple(tuple_name, cols or table.column_names, rename=False)

def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Tuple[np.ndarray], None, None]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Generator[Tuple[np.ndarray], None, None]:
yield named_tuple_class._make([_column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)])

return _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False)
Expand Down Expand Up @@ -242,7 +242,7 @@ def _table_reader_row_dict(table: Table, cols: Optional[Union[str, Sequence[str]
Raises:
ValueError
"""
def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Dict[str, Any]]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Iterable[Dict[str, Any]]:
make_dict = lambda values: {col_def.name: value for col_def, value in zip(col_defs, values)}
mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)]
return map(make_dict, zip(*mvs))
Expand Down Expand Up @@ -275,9 +275,9 @@ def _table_reader_row_tuple(table: Table, cols: Optional[Union[str, Sequence[str
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)
named_tuple_class = namedtuple(tuple_name, cols or table.column_names, rename=False)

def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Tuple[Any, ...]]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Iterable[Tuple[Any, ...]]:
mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)]
return map(named_tuple_class._make, zip(*mvs))

Expand Down
184 changes: 145 additions & 39 deletions py/server/deephaven/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

""" This module implements the Column class and functions that work with Columns. """

from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence, Any
from functools import cached_property
from typing import Sequence, Any, Optional
from warnings import warn

import jpy

import deephaven.dtypes as dtypes
from deephaven import DHError
from deephaven.dtypes import DType
from deephaven.dtypes import _instant_array
from deephaven.dtypes import DType, _instant_array, from_jtype
from deephaven._wrapper import JObjectWrapper

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
Expand All @@ -32,46 +33,151 @@ def __repr__(self):
return self.name


@dataclass
class Column:
""" A Column object represents a column definition in a Deephaven Table. """
name: str
data_type: DType
component_type: DType = None
column_type: ColumnType = ColumnType.NORMAL
class ColumnDefinition(JObjectWrapper):
"""A Deephaven column definition."""

@property
def j_column_header(self):
return _JColumnHeader.of(self.name, self.data_type.qst_type)
j_object_type = _JColumnDefinition

def __init__(self, j_column_definition: jpy.JType):
self.j_column_definition = j_column_definition

@property
def j_column_definition(self):
if hasattr(self.data_type.j_type, 'jclass'):
j_data_type = self.data_type.j_type.jclass
else:
j_data_type = self.data_type.qst_type.clazz()
j_component_type = self.component_type.qst_type.clazz() if self.component_type else None
j_column_type = self.column_type.value
return _JColumnDefinition.fromGenericType(self.name, j_data_type, j_component_type, j_column_type)


@dataclass
class InputColumn(Column):
""" An InputColumn represents a user defined column with some input data. """
input_data: Any = field(default=None)

def __post_init__(self):
def j_object(self) -> jpy.JType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydoc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we pydoc any of our j_object property methods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, but do a quick check.

return self.j_column_definition

@cached_property
def name(self) -> str:
chipkent marked this conversation as resolved.
Show resolved Hide resolved
"""The column name."""
return self.j_column_definition.getName()

@cached_property
def data_type(self) -> DType:
chipkent marked this conversation as resolved.
Show resolved Hide resolved
"""The column data type."""
return from_jtype(self.j_column_definition.getDataType())

@cached_property
def component_type(self) -> Optional[DType]:
"""The column component type."""
return from_jtype(self.j_column_definition.getComponentType())

@cached_property
def column_type(self) -> ColumnType:
chipkent marked this conversation as resolved.
Show resolved Hide resolved
"""The column type."""
return ColumnType(self.j_column_definition.getColumnType())


class Column(ColumnDefinition):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Column needed anymore?

Copy link
Member Author

@devinrsmith devinrsmith Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a breaking change to remove, as users may be relying on the Column constructor, for example

some_method([Column("Foo", dtypes.int32), Column("Bar", dtypes.str)])

I would be in favor of deprecating for eventual removal, suggesting users use ColumnDefinition instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with deprecation with a plan to remove in the next release. Create a ticket for removal and assign it to one of us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""A Column object represents a column definition in a Deephaven Table. Deprecated for removal next release, prefer col_def."""

def __init__(
self,
name: str,
data_type: DType,
component_type: DType = None,
column_type: ColumnType = ColumnType.NORMAL,
):
"""Deprecated for removal next release, prefer col_def."""
warn(
"Column is deprecated for removal next release, prefer col_def",
DeprecationWarning,
stacklevel=2,
)
super().__init__(
col_def(name, data_type, component_type, column_type).j_column_definition
)


class InputColumn:
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
"""An InputColumn represents a user defined column with some input data."""

def __init__(
chipkent marked this conversation as resolved.
Show resolved Hide resolved
self,
name: str = None,
data_type: DType = None,
component_type: DType = None,
column_type: ColumnType = ColumnType.NORMAL,
input_data: Any = None,
):
"""Creates an InputColumn.
Args:
name (str): the column name
data_type (DType): the column data type
component_type (Optional[DType]): the column component type, None by default
column_type (ColumnType): the column type, NORMAL by default
input_data: Any: the input data, by default is None

Returns:
a new InputColumn

Raises:
DHError
"""
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
try:
if self.input_data is None:
self.j_column = _JColumn.empty(self.j_column_header)
else:
if self.data_type.is_primitive:
self.j_column = _JColumn.ofUnsafe(self.name, dtypes.array(self.data_type, self.input_data,
remap=dtypes.null_remap(self.data_type)))
else:
self.j_column = _JColumn.of(self.j_column_header, dtypes.array(self.data_type, self.input_data))
self._column_definition = col_def(
name, data_type, component_type, column_type
)
self.j_column = self._to_j_column(input_data)
except Exception as e:
raise DHError(e, f"failed to create an InputColumn ({self.name}).") from e
raise DHError(e, f"failed to create an InputColumn ({name}).") from e

def _to_j_column(self, input_data: Any = None) -> jpy.JType:
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
if input_data is None:
return _JColumn.empty(
_JColumnHeader.of(
self._column_definition.name,
self._column_definition.data_type.qst_type,
)
)
if self._column_definition.data_type.is_primitive:
return _JColumn.ofUnsafe(
self._column_definition.name,
dtypes.array(
self._column_definition.data_type,
input_data,
remap=dtypes.null_remap(self._column_definition.data_type),
),
)
return _JColumn.of(
_JColumnHeader.of(
self._column_definition.name, self._column_definition.data_type.qst_type
),
dtypes.array(self._column_definition.data_type, input_data),
)


def col_def(
name: str,
data_type: DType,
component_type: Optional[DType] = None,
column_type: ColumnType = ColumnType.NORMAL,
) -> ColumnDefinition:
"""Creates a ColumnDefinition.

Args:
name (str): the column name
data_type (DType): the column data type
component_type (Optional[DType]): the column component type, None by default
column_type (ColumnType): the column type, ColumnType.NORMAL by default

Returns:
a new ColumnDefinition

Raises:
DHError
"""
try:
return ColumnDefinition(
_JColumnDefinition.fromGenericType(
name,
data_type.j_type.jclass
if hasattr(data_type.j_type, "jclass")
else data_type.qst_type.clazz(),
component_type.qst_type.clazz() if component_type else None,
column_type.value,
)
)
except Exception as e:
raise DHError(e, f"failed to create a ColumnDefinition ({name}).") from e


def bool_col(name: str, data: Sequence) -> InputColumn:
Expand Down
13 changes: 4 additions & 9 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.column import Column
from deephaven.dtypes import DType
from deephaven.experimental import s3

from deephaven.jcompat import j_table_definition

from deephaven.table import Table
from deephaven.table import Table, TableDefinition, TableDefinitionLike

_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions")
_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter")
Expand All @@ -39,14 +34,14 @@ class IcebergInstructions(JObjectWrapper):
j_object_type = _JIcebergInstructions

def __init__(self,
table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None,
table_definition: Optional[TableDefinitionLike] = None,
data_instructions: Optional[s3.S3Instructions] = None,
column_renames: Optional[Dict[str, str]] = None):
"""
Initializes the instructions using the provided parameters.

Args:
table_definition (Optional[Union[Dict[str, DType], List[Column], None]]): the table definition; if omitted,
table_definition (Optional[TableDefinitionLike]): the table definition; if omitted,
the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table
will have that definition. This is useful for specifying a subset of the Iceberg schema columns.
data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when
Expand All @@ -62,7 +57,7 @@ def __init__(self,
builder = self.j_object_type.builder()

if table_definition is not None:
builder.tableDefinition(j_table_definition(table_definition))
builder.tableDefinition(TableDefinition(table_definition).j_table_definition)

if data_instructions is not None:
builder.dataInstructions(data_instructions.j_object)
Expand Down
Loading
Loading