From e38eb83dc659117e534a99dbc69e8c20a94a7906 Mon Sep 17 00:00:00 2001 From: Jim Crist-Harif Date: Thu, 5 Dec 2024 12:37:29 -0600 Subject: [PATCH] refactor(impala): remove impala table (#9840) This removes the `ImpalaTable` subclass. Relevant methods that were on that class have been moved to the `Backend` class instead. We used to have backend-specific subclasses of `Table` that added some backend-specific methods. Most of those were removed in prior releases, `ImpalaTable` was the last remaining one. As this is a breaking change, we should hold off on merging this until we're ready to start prepping for a 10.0 release. Depends on #9839. Fixes #9365. --------- Co-authored-by: Gil Forsyth --- ibis/backends/impala/__init__.py | 277 +++++++++++--- ibis/backends/impala/client.py | 362 ------------------ ibis/backends/impala/ddl.py | 150 ++------ ibis/backends/impala/tests/test_ddl.py | 89 +---- .../impala/tests/test_ddl_compilation.py | 32 -- .../backends/impala/tests/test_parquet_ddl.py | 5 +- ibis/backends/impala/tests/test_partition.py | 105 ++--- ibis/backends/impala/tests/test_patched.py | 34 +- 8 files changed, 309 insertions(+), 745 deletions(-) delete mode 100644 ibis/backends/impala/client.py diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index 88ef7d1d3402..aefa5840cafb 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -20,18 +20,6 @@ import ibis.expr.types as ir from ibis import util from ibis.backends.impala import ddl, udf -from ibis.backends.impala.client import ImpalaTable -from ibis.backends.impala.ddl import ( - CTAS, - CreateDatabase, - CreateTableWithSchema, - CreateView, - DropDatabase, - DropTable, - DropView, - RenameTable, - TruncateTable, -) from ibis.backends.impala.udf import ( aggregate_function, scalar_function, @@ -308,7 +296,7 @@ def create_database(self, name, path=None, force=False): Forcibly create the database """ - statement = CreateDatabase(name, path=path, can_exist=force) + statement = ddl.CreateDatabase(name, path=path, can_exist=force) self._safe_exec_sql(statement) def drop_database(self, name, force=False): @@ -356,7 +344,7 @@ def drop_database(self, name, force=False): f"Database {name} must be empty before " "being dropped, or set force=True" ) - statement = DropDatabase(name, must_exist=not force) + statement = ddl.DropDatabase(name, must_exist=not force) self._safe_exec_sql(statement) def get_schema( @@ -443,18 +431,14 @@ def create_view( overwrite: bool = False, ) -> ir.Table: select = self.compile(obj) - statement = CreateView(name, select, database=database, can_exist=overwrite) + statement = ddl.CreateView(name, select, database=database, can_exist=overwrite) self._safe_exec_sql(statement) return self.table(name, database=database) def drop_view(self, name, database=None, force=False): - stmt = DropView(name, database=database, must_exist=not force) + stmt = ddl.DropView(name, database=database, must_exist=not force) self._safe_exec_sql(stmt) - def table(self, name: str, database: str | None = None, **kwargs: Any) -> ir.Table: - expr = super().table(name, database=database, **kwargs) - return ImpalaTable(expr.op()) - def create_table( self, name: str, @@ -508,7 +492,6 @@ def create_table( Table properties to set on table creation. like_parquet Can specify instead of a schema - """ if obj is None and schema is None: raise com.IbisError("The schema or obj parameter is required") @@ -534,22 +517,22 @@ def create_table( self.drop_table(name, force=True) self._safe_exec_sql( - CTAS( + ddl.CTAS( name, select, database=database or self.current_database, format=format, external=True if location is not None else external, partition=partition, - tbl_properties=tbl_properties, path=location, + tbl_properties=tbl_properties, ) ) else: # schema is not None if overwrite: self.drop_table(name, force=True) self._safe_exec_sql( - CreateTableWithSchema( + ddl.CreateTableWithSchema( name, schema or obj.schema(), database=database or self.current_database, @@ -564,7 +547,7 @@ def create_table( def avro_file( self, directory, avro_schema, name=None, database=None, external=True - ): + ) -> ir.Table: """Create a table to read a collection of Avro data. Parameters @@ -582,9 +565,8 @@ def avro_file( Returns ------- - ImpalaTable - Impala table expression - + Table + Table expression """ name, database = self._get_concrete_table_path(name, database) @@ -605,7 +587,7 @@ def delimited_file( escapechar=None, lineterminator=None, external=True, - ): + ) -> ir.Table: """Interpret delimited text files as an Ibis table expression. See the `parquet_file` method for more details on what happens under @@ -634,9 +616,8 @@ def delimited_file( Returns ------- - ImpalaTable - Impala table expression - + Table + Table expression """ name, database = self._get_concrete_table_path(name, database) @@ -663,7 +644,7 @@ def parquet_file( external: bool = True, like_file: str | Path | None = None, like_table: str | None = None, - ): + ) -> ir.Table: """Create an Ibis table from the passed directory of Parquet files. The table can be optionally named, otherwise a unique name will be @@ -694,9 +675,8 @@ def parquet_file( Returns ------- - ImpalaTable - Impala table expression - + Table + Table expression """ name, database = self._get_concrete_table_path(name, database) @@ -744,34 +724,90 @@ def insert( database=None, overwrite=False, partition=None, - values=None, validate=True, - ): - """Insert data into an existing table. + ) -> None: + """Insert into an Impala table. - See - [`ImpalaTable.insert`](../backends/impala.qmd#ibis.backends.impala.client.ImpalaTable.insert) - for parameters. + Parameters + ---------- + table_name + The table name + obj + Table expression or DataFrame + database + The table database + overwrite + If True, will replace existing contents of table + partition + For partitioned tables, indicate the partition that's being + inserted into, either with an ordered list of partition keys or a + dict of partition field name to value. For example for the + partition (year=2007, month=7), this can be either (2007, 7) or + {'year': 2007, 'month': 7}. + validate + If True, do more rigorous validation that schema of table being + inserted is compatible with the existing table Examples -------- - >>> table = "my_table" - >>> con.insert(table, table_expr) # quartodoc: +SKIP # doctest: +SKIP + Append to an existing table + + >>> con.insert(table_name, table_expr) # quartodoc: +SKIP # doctest: +SKIP Completely overwrite contents - >>> con.insert(table, table_expr, overwrite=True) # quartodoc: +SKIP # doctest: +SKIP + + >>> con.insert(table_name, table_expr, overwrite=True) # quartodoc: +SKIP # doctest: +SKIP """ if isinstance(obj, ir.Table): self._run_pre_execute_hooks(obj) + table = self.table(table_name, database=database) - return table.insert( - obj=obj, - overwrite=overwrite, + + if not isinstance(obj, ir.Table): + obj = ibis.memtable(obj) + + if not set(table.columns).difference(obj.columns): + # project out using column order of parent table + # if column names match + obj = obj.select(table.columns) + + self._run_pre_execute_hooks(obj) + + if validate: + existing_schema = table.schema() + insert_schema = obj.schema() + if not insert_schema.equals(existing_schema): + if set(insert_schema.names) != set(existing_schema.names): + raise com.IbisInputError("Schemas have different names") + + for name in insert_schema: + lt = insert_schema[name] + rt = existing_schema[name] + if not lt.castable(rt): + raise com.IbisInputError(f"Cannot safely cast {lt!r} to {rt!r}") + + if partition is not None: + partition_schema = self.get_partition_schema(table_name, database=database) + partition_schema_names = frozenset(partition_schema.names) + obj = obj.select( + [ + column + for column in obj.columns + if column not in partition_schema_names + ] + ) + else: + partition_schema = None + + statement = ddl.InsertSelect( + self._fully_qualified_name(table_name, database), + self.compile(obj), partition=partition, - values=values, - validate=validate, + partition_schema=partition_schema, + overwrite=overwrite, ) + self._safe_exec_sql(statement.compile()) def drop_table( self, name: str, *, database: str | None = None, force: bool = False @@ -794,7 +830,7 @@ def drop_table( >>> con.drop_table(table, database=db, force=True) # quartodoc: +SKIP # doctest: +SKIP """ - statement = DropTable(name, database=database, must_exist=not force) + statement = ddl.DropTable(name, database=database, must_exist=not force) self._safe_exec_sql(statement) def truncate_table(self, name: str, database: str | None = None) -> None: @@ -808,7 +844,7 @@ def truncate_table(self, name: str, database: str | None = None) -> None: Database name """ - statement = TruncateTable(name, database=database) + statement = ddl.TruncateTable(name, database=database) self._safe_exec_sql(statement) def rename_table(self, old_name: str, new_name: str) -> None: @@ -822,7 +858,7 @@ def rename_table(self, old_name: str, new_name: str) -> None: The new name of the table. """ - statement = RenameTable(old_name, new_name) + statement = ddl.RenameTable(old_name, new_name) self._safe_exec_sql(statement) def drop_table_or_view(self, name, *, database=None, force=False): @@ -1142,6 +1178,141 @@ def list_partitions(self, name, database=None): stmt = self._table_command("SHOW PARTITIONS", name, database=database) return self._exec_statement(stmt) + def get_partition_schema( + self, + table_name: str, + database: str | None = None, + ) -> sch.Schema: + """Return the schema for the partition columns. + + Parameters + ---------- + table_name + Table name + database + Database name + + Returns + ------- + Schema + Ibis schema for the partition columns + """ + schema = self.get_schema(table_name, database=database) + result = self.list_partitions(table_name, database) + + partition_fields = [] + for col in result.columns: + if col not in schema: + break + partition_fields.append((col, schema[col])) + + return sch.Schema(dict(partition_fields)) + + def add_partition( + self, + table_name: str, + spec: dict[str, Any] | list, + *, + database: str | None = None, + location: str | None = None, + ) -> None: + """Add a new table partition. + + Partition parameters can be set in a single DDL statement or you can + use `alter_partition` to set them after the fact. + + Parameters + ---------- + table_name + The table name. + spec + The partition keys for the partition being added. + database + The database name. If not provided, the current database is used. + location + Location of the partition + """ + part_schema = self.get_partition_schema(table_name, database) + stmt = ddl.AddPartition( + self._fully_qualified_name(table_name, database), + spec, + part_schema, + location=location, + ) + self._safe_exec_sql(stmt) + + def drop_partition( + self, + table_name: str, + spec: dict[str, Any] | list, + *, + database: str | None = None, + ) -> None: + """Drop an existing table partition. + + Parameters + ---------- + table_name + The table name. + spec + The partition keys for the partition being dropped. + database + The database name. If not provided, the current database is used. + """ + part_schema = self.get_partition_schema(table_name, database) + stmt = ddl.DropPartition( + self._fully_qualified_name(table_name, database), + spec, + part_schema, + ) + self._safe_exec_sql(stmt) + + def alter_partition( + self, + table_name: str, + spec: dict[str, Any] | list, + *, + database: str | None = None, + location: str | None = None, + format: str | None = None, + tbl_properties: dict | None = None, + serde_properties: dict | None = None, + ) -> None: + """Change settings and parameters of an existing partition. + + Parameters + ---------- + table_name + The table name + spec + The partition keys for the partition being modified + database + The database name. If not provided, the current database is used. + location + Location of the partition + format + Table format + tbl_properties + Table properties + serde_properties + Serialization/deserialization properties + """ + part_schema = self.get_partition_schema(table_name, database) + + alterations = [ + ("location", location), + ("format", format), + ("tbl_properties", tbl_properties), + ("serde_properties", serde_properties), + ] + + qname = self._fully_qualified_name(table_name, database) + + for field, values in alterations: + if values is not None: + stmt = ddl.AlterPartition(qname, spec, part_schema, **{field, values}) + self._safe_exec_sql(stmt) + def table_stats(self, name, database=None): """Return results of `SHOW TABLE STATS` for the table `name`.""" stmt = self._table_command("SHOW TABLE STATS", name, database=database) diff --git a/ibis/backends/impala/client.py b/ibis/backends/impala/client.py deleted file mode 100644 index 4dfeaa494fda..000000000000 --- a/ibis/backends/impala/client.py +++ /dev/null @@ -1,362 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -import sqlglot as sg - -import ibis -import ibis.common.exceptions as com -import ibis.expr.schema as sch -import ibis.expr.types as ir -from ibis.backends.impala import ddl -from ibis.backends.impala.ddl import AlterTable, InsertSelect - -if TYPE_CHECKING: - import pandas as pd - - -class ImpalaTable(ir.Table): - """A physical table in the Impala-Hive metastore.""" - - @property - def _qualified_name(self) -> str: - op = self.op() - return sg.table(op.name, catalog=op.namespace.database).sql(dialect="hive") - - @property - def _unqualified_name(self) -> str: - return self.op().name - - @property - def _client(self): - return self.op().source - - @property - def _database(self) -> str: - return self.op().namespace.database - - def compute_stats(self, incremental=False): - """Invoke Impala COMPUTE STATS command on the table.""" - return self._client.compute_stats( - self.op().name, database=self._database, incremental=incremental - ) - - def invalidate_metadata(self): - self._client.invalidate_metadata(self.op().name, database=self._database) - - def refresh(self): - self._client.refresh(self.op().name, database=self._database) - - def metadata(self): - """Return results of `DESCRIBE FORMATTED` statement.""" - return self._client.describe_formatted(self.op().name, database=self._database) - - describe_formatted = metadata - - def files(self): - """Return results of SHOW FILES statement.""" - return self._client.show_files(self.op().name, database=self._database) - - def drop(self): - """Drop the table from the database.""" - self._client.drop_table_or_view(self.op().name, database=self._database) - - def truncate(self): - self._client.truncate_table(self.op().name, database=self._database) - - def insert( - self, - obj=None, - overwrite=False, - partition=None, - values=None, - validate=True, - ): - """Insert into an Impala table. - - Parameters - ---------- - obj - Table expression or DataFrame - overwrite - If True, will replace existing contents of table - partition - For partitioned tables, indicate the partition that's being - inserted into, either with an ordered list of partition keys or a - dict of partition field name to value. For example for the - partition (year=2007, month=7), this can be either (2007, 7) or - {'year': 2007, 'month': 7}. - values - Unsupported and unused - validate - If True, do more rigorous validation that schema of table being - inserted is compatible with the existing table - - Examples - -------- - Append to an existing table - - >>> t.insert(table_expr) # quartodoc: +SKIP # doctest: +SKIP - - Completely overwrite contents - - >>> t.insert(table_expr, overwrite=True) # quartodoc: +SKIP # doctest: +SKIP - - """ - if values is not None: - raise NotImplementedError - - if not isinstance(obj, ir.Table): - obj = ibis.memtable(obj) - - if not set(self.columns).difference(obj.columns): - # project out using column order of parent table - # if column names match - obj = obj.select(self.columns) - - self._client._run_pre_execute_hooks(obj) - - expr = obj - if validate: - existing_schema = self.schema() - insert_schema = expr.schema() - if not insert_schema.equals(existing_schema): - _validate_compatible(insert_schema, existing_schema) - - if partition is not None: - partition_schema = self.partition_schema() - partition_schema_names = frozenset(partition_schema.names) - expr = expr.select( - [ - column - for column in expr.columns - if column not in partition_schema_names - ] - ) - else: - partition_schema = None - - statement = InsertSelect( - self._qualified_name, - self._client.compile(expr), - partition=partition, - partition_schema=partition_schema, - overwrite=overwrite, - ) - self._client._safe_exec_sql(statement.compile()) - return self - - def load_data(self, path, overwrite=False, partition=None): - """Load data into an Impala table. - - Parameters - ---------- - path - Data to load - overwrite - Overwrite the existing data in the entire table or indicated - partition - partition - If specified, the partition must already exist - - """ - if partition is not None: - partition_schema = self.partition_schema() - else: - partition_schema = None - - stmt = ddl.LoadData( - self._qualified_name, - path, - partition=partition, - partition_schema=partition_schema, - overwrite=overwrite, - ) - - self._client._safe_exec_sql(stmt.compile()) - return self - - @property - def name(self) -> str: - return self.op().name - - @property - def is_partitioned(self): - """True if the table is partitioned.""" - return self.metadata().is_partitioned - - def partition_schema(self): - """Return the schema for the partition columns.""" - schema = self.schema() - result = self.partitions() - - partition_fields = [] - for col in result.columns: - if col not in schema: - break - partition_fields.append((col, schema[col])) - - return sch.Schema(dict(partition_fields)) - - def add_partition(self, spec, location=None): - """Add a new table partition. - - Partition parameters can be set in a single DDL statement or you can - use `alter_partition` to set them after the fact. - """ - part_schema = self.partition_schema() - stmt = ddl.AddPartition( - self._qualified_name, spec, part_schema, location=location - ) - self._client._safe_exec_sql(stmt) - return self - - def alter( - self, - location=None, - format=None, - tbl_properties=None, - serde_properties=None, - ): - """Change settings and parameters of the table. - - Parameters - ---------- - location - For partitioned tables, you may want the alter_partition function - format - Table format - tbl_properties - Table properties - serde_properties - Serialization/deserialization properties - - """ - - def _run_ddl(**kwds): - stmt = AlterTable(self._qualified_name, **kwds) - self._client._safe_exec_sql(stmt) - return self - - return self._alter_table_helper( - _run_ddl, - location=location, - format=format, - tbl_properties=tbl_properties, - serde_properties=serde_properties, - ) - - def set_external(self, is_external=True): - """Toggle the `EXTERNAL` table property.""" - self.alter(tbl_properties={"EXTERNAL": is_external}) - - def alter_partition( - self, - spec, - location=None, - format=None, - tbl_properties=None, - serde_properties=None, - ): - """Change settings and parameters of an existing partition. - - Parameters - ---------- - spec - The partition keys for the partition being modified - location - Location of the partition - format - Table format - tbl_properties - Table properties - serde_properties - Serialization/deserialization properties - - """ - part_schema = self.partition_schema() - - def _run_ddl(**kwds): - stmt = ddl.AlterPartition(self._qualified_name, spec, part_schema, **kwds) - self._client._safe_exec_sql(stmt) - return self - - return self._alter_table_helper( - _run_ddl, - location=location, - format=format, - tbl_properties=tbl_properties, - serde_properties=serde_properties, - ) - - def _alter_table_helper(self, f, **alterations): - results = [] - for k, v in alterations.items(): - if v is None: - continue - result = f(**{k: v}) - results.append(result) - return results - - def drop_partition(self, spec): - """Drop an existing table partition.""" - part_schema = self.partition_schema() - stmt = ddl.DropPartition(self._qualified_name, spec, part_schema) - self._client._safe_exec_sql(stmt) - return self - - def partitions(self): - """Return information about the table's partitions. - - Raises an exception if the table is not partitioned. - """ - return self._client.list_partitions(self._qualified_name) - - def stats(self) -> pd.DataFrame: - """Return results of `SHOW TABLE STATS`. - - If not partitioned, contains only one row. - - Returns - ------- - DataFrame - Table statistics - - """ - return self._client.table_stats(self._qualified_name) - - def column_stats(self) -> pd.DataFrame: - """Return results of `SHOW COLUMN STATS`. - - Returns - ------- - DataFrame - Column statistics - - """ - return self._client.column_stats(self._qualified_name) - - -# ---------------------------------------------------------------------- -# ORM-ish usability layer - - -class ScalarFunction: - def drop(self): - pass - - -class AggregateFunction: - def drop(self): - pass - - -def _validate_compatible(from_schema, to_schema): - if set(from_schema.names) != set(to_schema.names): - raise com.IbisInputError("Schemas have different names") - - for name in from_schema: - lt = from_schema[name] - rt = to_schema[name] - if not lt.castable(rt): - raise com.IbisInputError(f"Cannot safely cast {lt!r} to {rt!r}") diff --git a/ibis/backends/impala/ddl.py b/ibis/backends/impala/ddl.py index 39a47ef25de3..88b6ece8615b 100644 --- a/ibis/backends/impala/ddl.py +++ b/ibis/backends/impala/ddl.py @@ -162,51 +162,7 @@ def _pieces(self): yield self._tbl_properties() -class AlterTable(ImpalaBase, DDL): - def __init__( - self, - table, - location=None, - format=None, - tbl_properties=None, - serde_properties=None, - ): - self.table = table - self.location = location - self.format = self.sanitize_format(format) - self.tbl_properties = tbl_properties - self.serde_properties = serde_properties - - def _wrap_command(self, cmd): - return f"ALTER TABLE {cmd}" - - def _format_properties(self, prefix=""): - tokens = [] - - if self.location is not None: - tokens.append(f"LOCATION '{self.location}'") - - if self.format is not None: - tokens.append(f"FILEFORMAT {self.format}") - - if self.tbl_properties is not None: - tokens.append(self.format_tblproperties(self.tbl_properties)) - - if self.serde_properties is not None: - tokens.append(self.format_serdeproperties(self.serde_properties)) - - if len(tokens) > 0: - return "\n{}{}".format(prefix, "\n".join(tokens)) - else: - return "" - - def compile(self): - props = self._format_properties() - action = f"{self.table} SET {props}" - return self._wrap_command(action) - - -class RenameTable(AlterTable): +class RenameTable(ImpalaBase, DDL): def __init__( self, old_name: str, @@ -222,7 +178,7 @@ def __init__( ) def compile(self): - return self._wrap_command(f"{self._old} RENAME TO {self._new}") + return f"ALTER TABLE {self._old} RENAME TO {self._new}" class DropTable(ImpalaBase, DropObject): @@ -443,45 +399,10 @@ def _pieces(self): yield "\n".join(self.table_format.to_ddl()) -class LoadData(ImpalaBase, DDL): - """Generate DDL for LOAD DATA command. - - Cannot be cancelled - """ +class PartitionProperties(ImpalaBase, DDL): + _command = "" + _property_prefix = "" - def __init__( - self, - table_name, - path, - database=None, - partition=None, - partition_schema=None, - overwrite=False, - ): - self.table_name = table_name - self.database = database - self.path = path - - self.partition = partition - self.partition_schema = partition_schema - - self.overwrite = overwrite - - def compile(self): - overwrite = "OVERWRITE " if self.overwrite else "" - - if self.partition is not None: - partition = "\n" + self.format_partition( - self.partition, self.partition_schema - ) - else: - partition = "" - - scoped_name = self.scoped_name(self.table_name, self.database) - return f"LOAD DATA INPATH {self.path!r} {overwrite}INTO TABLE {scoped_name}{partition}" - - -class PartitionProperties(AlterTable): def __init__( self, table, @@ -492,51 +413,56 @@ def __init__( tbl_properties=None, serde_properties=None, ): - super().__init__( - table, - location=location, - format=format, - tbl_properties=tbl_properties, - serde_properties=serde_properties, - ) + self.table = table + self.location = location + self.format = self.sanitize_format(format) + self.tbl_properties = tbl_properties + self.serde_properties = serde_properties self.partition = partition self.partition_schema = partition_schema - def _compile(self, cmd, property_prefix=""): + def compile(self): part = self.format_partition(self.partition, self.partition_schema) - if cmd: - part = f"{cmd} {part}" + if self._command: + part = f"{self._command} {part}" - props = self._format_properties(property_prefix) - action = f"{self.table} {part}{props}" - return self._wrap_command(action) + props = self._format_properties() + return f"ALTER TABLE {self.table} {part}{props}" + def _format_properties(self): + tokens = [] -class AddPartition(PartitionProperties): - dialect = "hive" + if self.location is not None: + tokens.append(f"LOCATION '{self.location}'") - def __init__(self, table, partition, partition_schema, location=None): - super().__init__(table, partition, partition_schema, location=location) + if self.format is not None: + tokens.append(f"FILEFORMAT {self.format}") - def compile(self): - return self._compile("ADD") + if self.tbl_properties is not None: + tokens.append(self.format_tblproperties(self.tbl_properties)) + if self.serde_properties is not None: + tokens.append(self.format_serdeproperties(self.serde_properties)) -class AlterPartition(PartitionProperties): - dialect = "hive" + if len(tokens) > 0: + return "\n{}{}".format(self._property_prefix, "\n".join(tokens)) + else: + return "" - def compile(self): - return self._compile("", "SET ") + +class AddPartition(PartitionProperties): + dialect = "hive" + _command = "ADD" -class DropPartition(PartitionProperties): +class AlterPartition(PartitionProperties): dialect = "hive" + _property_prefix = "SET " - def __init__(self, table, partition, partition_schema): - super().__init__(table, partition, partition_schema) - def compile(self): - return self._compile("DROP") +class DropPartition(PartitionProperties): + dialect = "hive" + _command = "DROP" class CacheTable(ImpalaBase, DDL): diff --git a/ibis/backends/impala/tests/test_ddl.py b/ibis/backends/impala/tests/test_ddl.py index ad74932176b5..fc168b53f691 100644 --- a/ibis/backends/impala/tests/test_ddl.py +++ b/ibis/backends/impala/tests/test_ddl.py @@ -93,16 +93,6 @@ def test_truncate_table(con, alltypes, temp_table): assert not nrows -def test_truncate_table_expression(con, alltypes, temp_table): - expr = alltypes.limit(1) - - con.create_table(temp_table, obj=expr) - t = con.table(temp_table) - t.truncate() - nrows = t.count().execute() - assert not nrows - - def test_ctas_from_table_expr(con, alltypes, temp_table): t = con.create_table(temp_table, alltypes) assert t.count().execute() @@ -130,19 +120,15 @@ def test_insert_table(con, alltypes, temp_table, test_data_db): expr = alltypes db = test_data_db - con.create_table(temp_table, expr.limit(0), database=db) - - con.insert(temp_table, expr.limit(10), database=db) + t = con.create_table(temp_table, expr.limit(0), database=db) - # check using ImpalaTable.insert - t = con.table(temp_table, database=db) - t.insert(expr.limit(10)) + con.insert(temp_table, expr.limit(20), database=db) sz = t.count() assert sz.execute() == 20 # Overwrite and verify only 10 rows now - t.insert(expr.limit(10), overwrite=True) + con.insert(temp_table, expr.limit(10), database=db, overwrite=True) assert sz.execute() == 10 @@ -157,19 +143,12 @@ def test_insert_validate_types(con, alltypes, test_data_db, temp_table): database=db, ) - t = con.table(temp_table, database=db) - - to_insert = expr.select( - expr.tinyint_col, expr.smallint_col.name("int_col"), expr.string_col - ) - t.insert(to_insert.limit(10)) - to_insert = expr.select( expr.tinyint_col, expr.smallint_col.cast("int32").name("int_col"), expr.string_col, ) - t.insert(to_insert.limit(10)) + con.insert(temp_table, to_insert.limit(10), database=db) to_insert = expr.select( expr.tinyint_col, expr.bigint_col.name("int_col"), expr.string_col @@ -177,16 +156,7 @@ def test_insert_validate_types(con, alltypes, test_data_db, temp_table): limit_expr = to_insert.limit(10) with pytest.raises(com.IbisError): - t.insert(limit_expr) - - -def test_compute_stats(con): - t = con.table("functional_alltypes") - - t.compute_stats() - t.compute_stats(incremental=True) - - con.compute_stats("functional_alltypes") + con.insert(temp_table, limit_expr, database=db) @pytest.fixture @@ -202,53 +172,6 @@ def test_drop_view(con, created_view): assert created_view not in con.list_tables() -@pytest.fixture -def path_uuid(): - return f"change-location-{util.guid()}" - - -@pytest.fixture -def table(con, tmp_dir, path_uuid): - table_name = f"table_{util.guid()}" - fake_path = pjoin(tmp_dir, path_uuid) - schema = ibis.schema([("foo", "string"), ("bar", "int64")]) - yield con.create_table( - table_name, schema=schema, format="parquet", external=True, location=fake_path - ) - con.drop_table(table_name) - - -def test_change_location(table, tmp_dir, path_uuid): - old_loc = table.metadata().location - - new_path = pjoin(tmp_dir, "new-path") - table.alter(location=new_path) - - new_loc = table.metadata().location - assert new_loc == old_loc.replace(path_uuid, "new-path") - - -def test_change_properties(table): - props = {"foo": "1", "bar": "2"} - - table.alter(tbl_properties=props) - tbl_props = table.metadata().tbl_properties - for k, v in props.items(): - assert v == tbl_props[k] - - table.alter(serde_properties=props) - serde_props = table.metadata().serde_properties - for k, v in props.items(): - assert v == serde_props[k] - - -def test_change_format(table): - table.alter(format="avro") - - meta = table.metadata() - assert "Avro" in meta.hive_format - - def test_query_avro(con, test_data_dir): hdfs_path = pjoin(test_data_dir, "directory/avro/tpch/region") @@ -265,7 +188,7 @@ def test_query_avro(con, test_data_dir): table = con.avro_file(hdfs_path, avro_schema) # table exists - assert table._qualified_name in con.list_tables() + assert table.get_name() in con.list_tables() expr = table.r_name.value_counts() expr.execute() diff --git a/ibis/backends/impala/tests/test_ddl_compilation.py b/ibis/backends/impala/tests/test_ddl_compilation.py index d6f386ec0cbe..46e71f4f0a70 100644 --- a/ibis/backends/impala/tests/test_ddl_compilation.py +++ b/ibis/backends/impala/tests/test_ddl_compilation.py @@ -42,38 +42,6 @@ def test_select_basics(t, snapshot): snapshot.assert_match(result, "out2.sql") -def test_load_data_unpartitioned(snapshot): - path = "/path/to/data" - stmt = ddl.LoadData("functional_alltypes", path, database="foo") - - result = stmt.compile() - snapshot.assert_match(result, "out1.sql") - - stmt.overwrite = True - result = stmt.compile() - snapshot.assert_match(result, "out2.sql") - - -def test_load_data_partitioned(snapshot): - path = "/path/to/data" - part = {"year": 2007, "month": 7} - part_schema = ibis.schema([("year", "int32"), ("month", "int32")]) - stmt = ddl.LoadData( - "functional_alltypes", - path, - database="foo", - partition=part, - partition_schema=part_schema, - ) - - result = stmt.compile() - snapshot.assert_match(result, "out1.sql") - - stmt.overwrite = True - result = stmt.compile() - snapshot.assert_match(result, "out2.sql") - - def test_cache_table_pool_name(snapshot): statement = ddl.CacheTable("foo", database="bar") query = statement.compile() diff --git a/ibis/backends/impala/tests/test_parquet_ddl.py b/ibis/backends/impala/tests/test_parquet_ddl.py index 4cc05677c509..c67fe383d16e 100644 --- a/ibis/backends/impala/tests/test_parquet_ddl.py +++ b/ibis/backends/impala/tests/test_parquet_ddl.py @@ -42,10 +42,7 @@ def test_query_parquet_file_with_schema(con, test_data_dir): table = con.parquet_file(hdfs_path, schema=schema) - name = table._qualified_name - - # table exists - con.table(name) + assert table.get_name() in con.list_tables() expr = table.r_name.value_counts() expr.execute() diff --git a/ibis/backends/impala/tests/test_partition.py b/ibis/backends/impala/tests/test_partition.py index 52fe8a9b8bb5..ab6ff1673b25 100644 --- a/ibis/backends/impala/tests/test_partition.py +++ b/ibis/backends/impala/tests/test_partition.py @@ -1,7 +1,5 @@ from __future__ import annotations -from posixpath import join as pjoin - import pandas as pd import pandas.testing as tm import pytest @@ -39,12 +37,6 @@ def unpart_t(con, df): con.drop_table(pd_name) -def test_is_partitioned(con, temp_table): - schema = ibis.schema([("foo", "string"), ("year", "int32"), ("month", "string")]) - con.create_table(temp_table, schema=schema, partition=["year", "month"]) - assert con.table(temp_table).is_partitioned - - def test_create_table_with_partition_column(con, temp_table): schema = ibis.schema( [ @@ -69,7 +61,7 @@ def test_create_table_with_partition_column(con, temp_table): table_schema = con.get_schema(temp_table) assert_equal(table_schema, ex_schema) - partition_schema = con.table(temp_table).partition_schema() + partition_schema = con.get_partition_schema(temp_table) expected = ibis.schema([("year", "int32"), ("month", "string")]) assert_equal(partition_schema, expected) @@ -93,14 +85,14 @@ def test_create_partitioned_separate_schema(con, temp_table): table_schema = con.get_schema(temp_table) assert_equal(table_schema, ex_schema) - partition_schema = con.table(temp_table).partition_schema() + partition_schema = con.get_partition_schema(temp_table) assert_equal(partition_schema, part_schema) def test_unpartitioned_table_get_schema(con): tname = "functional_alltypes" with pytest.raises(ImpylaError): - con.table(tname).partition_schema() + con.get_partition_schema(tname) def test_insert_select_partitioned_table(con, df, temp_table, unpart_t): @@ -120,9 +112,16 @@ def test_insert_select_partitioned_table(con, df, temp_table, unpart_t): part = {"year": year, "month": month} else: part = [year, month] - part_t.insert(select_stmt, partition=part) + con.insert(temp_table, select_stmt, partition=part) + + result = part_t.execute().sort_values(by="id").reset_index(drop=True)[df.columns] - verify_partitioned_table(part_t, df, unique_keys) + tm.assert_frame_equal(result, df) + + parts = con.list_partitions(temp_table) + + # allow for the total line + assert len(parts) == len(unique_keys) + 1 @pytest.fixture @@ -151,17 +150,16 @@ def test_add_drop_partition_no_location(con, temp_table): partition=["year", "month"], tbl_properties={"transactional": "false"}, ) - table = con.table(temp_table) part = {"year": 2007, "month": 4} - table.add_partition(part) + con.add_partition(temp_table, part) - assert len(table.partitions()) == 2 + assert len(con.list_partitions(temp_table)) == 2 - table.drop_partition(part) + con.drop_partition(temp_table, part) - assert len(table.partitions()) == 1 + assert len(con.list_partitions(temp_table)) == 1 def test_add_drop_partition_owned_by_impala(con, temp_table): @@ -172,22 +170,19 @@ def test_add_drop_partition_owned_by_impala(con, temp_table): partition=["year", "month"], tbl_properties={"transactional": "false"}, ) - - table = con.table(temp_table) - part = {"year": 2007, "month": 4} subdir = util.guid() basename = util.guid() path = f"/tmp/{subdir}/{basename}" - table.add_partition(part, location=path) + con.add_partition(temp_table, part, location=path) - assert len(table.partitions()) == 2 + assert len(con.list_partitions(temp_table)) == 2 - table.drop_partition(part) + con.drop_partition(temp_table, part) - assert len(table.partitions()) == 1 + assert len(con.list_partitions(temp_table)) == 1 def test_add_drop_partition_hive_bug(con, temp_table): @@ -199,66 +194,14 @@ def test_add_drop_partition_hive_bug(con, temp_table): tbl_properties={"transactional": "false"}, ) - table = con.table(temp_table) - part = {"year": 2007, "month": 4} path = f"/tmp/{util.guid()}" - table.add_partition(part, location=path) - - assert len(table.partitions()) == 2 - - table.drop_partition(part) - - assert len(table.partitions()) == 1 - - -@pytest.mark.xfail( - raises=AttributeError, reason="test is bogus and needs to be rewritten" -) -def test_load_data_partition(con, tmp_dir, unpart_t, df, temp_table): - part_keys = ["year", "month"] - - con.create_table(temp_table, schema=unpart_t.schema(), partition=part_keys) - part_t = con.table(temp_table) - - # trim the runtime of this test - df = df[df.month == "1"].reset_index(drop=True) - - unique_keys = df[part_keys].drop_duplicates() - - hdfs_dir = pjoin(tmp_dir, "load-data-partition") - - df2 = df.drop(["year", "month"], axis="columns") - - csv_props = {"serialization.format": ",", "field.delim": ","} + con.add_partition(temp_table, part, location=path) - for i, (year, month) in enumerate(unique_keys.itertuples(index=False)): - chunk = df2[(df.year == year) & (df.month == month)] - chunk_path = pjoin(hdfs_dir, f"{i}.csv") + assert len(con.list_partitions(temp_table)) == 2 - con.write_dataframe(chunk, chunk_path) + con.drop_partition(temp_table, part) - # test both styles of insert - if i: - part = {"year": year, "month": month} - else: - part = [year, month] - - part_t.add_partition(part) - part_t.alter_partition(part, format="text", serde_properties=csv_props) - part_t.load_data(chunk_path, partition=part) - - verify_partitioned_table(part_t, df, unique_keys) - - -def verify_partitioned_table(part_t, df, unique_keys): - result = part_t.execute().sort_values(by="id").reset_index(drop=True)[df.columns] - - tm.assert_frame_equal(result, df) - - parts = part_t.partitions() - - # allow for the total line - assert len(parts) == len(unique_keys) + 1 + assert len(con.list_partitions(temp_table)) == 1 diff --git a/ibis/backends/impala/tests/test_patched.py b/ibis/backends/impala/tests/test_patched.py index df9fa679d197..36ea8268fab7 100644 --- a/ibis/backends/impala/tests/test_patched.py +++ b/ibis/backends/impala/tests/test_patched.py @@ -20,10 +20,6 @@ def test_invalidate_metadata(con, spy, test_data_db, qname): con.invalidate_metadata() spy.assert_called_with("INVALIDATE METADATA") - con.invalidate_metadata("functional_alltypes") - t = con.table("functional_alltypes") - t.invalidate_metadata() - con.invalidate_metadata("functional_alltypes", database=test_data_db) spy.assert_called_with(f"INVALIDATE METADATA {qname}") @@ -33,32 +29,34 @@ def test_refresh(con, spy, qname): con.refresh(tname) spy.assert_called_with(f"REFRESH {qname}") - t = con.table(tname) - t.refresh() - spy.assert_called_with(f"REFRESH {qname}") - def test_describe_formatted(con, spy, qname): - t = con.table("functional_alltypes") - desc = t.describe_formatted() + desc = con.describe_formatted("functional_alltypes") spy.assert_called_with(f"DESCRIBE FORMATTED {qname}") assert isinstance(desc, metadata.TableMetadata) def test_show_files(con, spy, qname): - t = con.table("functional_alltypes") - desc = t.files() + desc = con.show_files("functional_alltypes") spy.assert_called_with(f"SHOW FILES IN {qname}") assert isinstance(desc, pd.DataFrame) -def test_table_column_stats(con, spy, qname): - t = con.table("functional_alltypes") +def test_column_stats(con, spy, qname): + desc = con.column_stats("functional_alltypes") + spy.assert_called_with(f"SHOW COLUMN STATS {qname}") + assert isinstance(desc, pd.DataFrame) + - desc = t.stats() +def test_table_stats(con, spy, qname): + desc = con.table_stats("functional_alltypes") spy.assert_called_with(f"SHOW TABLE STATS {qname}") assert isinstance(desc, pd.DataFrame) - desc = t.column_stats() - spy.assert_called_with(f"SHOW COLUMN STATS {qname}") - assert isinstance(desc, pd.DataFrame) + +def test_compute_stats(con, spy, qname): + con.compute_stats("functional_alltypes") + spy.assert_called_with(f"COMPUTE STATS {qname}") + + con.compute_stats("functional_alltypes", incremental=True) + spy.assert_called_with(f"COMPUTE INCREMENTAL STATS {qname}")