diff --git a/ci/schema/databricks.sql b/ci/schema/databricks.sql index a2c1bb3fb610..1e5dbf16bf2c 100644 --- a/ci/schema/databricks.sql +++ b/ci/schema/databricks.sql @@ -1,17 +1,17 @@ CREATE VIEW IF NOT EXISTS diamonds AS -SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/diamonds.parquet`; +SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/diamonds.parquet`; CREATE VIEW IF NOT EXISTS batting AS -SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/batting.parquet`; +SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/batting.parquet`; CREATE VIEW IF NOT EXISTS awards_players AS -SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/awards_players.parquet`; +SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/awards_players.parquet`; CREATE VIEW IF NOT EXISTS functional_alltypes AS -SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/functional_alltypes.parquet`; +SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/functional_alltypes.parquet`; CREATE VIEW IF NOT EXISTS astronauts AS -SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/astronauts.parquet`; +SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/astronauts.parquet`; CREATE TABLE IF NOT EXISTS `array_types` AS VALUES (ARRAY(CAST(1 AS BIGINT), 2, 3), ARRAY('a', 'b', 'c'), ARRAY(1.0, 2.0, 3.0), 'a', 1.0, ARRAY(ARRAY(), ARRAY(CAST(1 AS BIGINT), 2, 3), NULL)), diff --git a/ibis/backends/databricks/__init__.py b/ibis/backends/databricks/__init__.py index 7b7bd4feca7d..b50740fb9b58 100644 --- a/ibis/backends/databricks/__init__.py +++ b/ibis/backends/databricks/__init__.py @@ -4,7 +4,9 @@ import contextlib import functools +import getpass import os +import sys import tempfile from pathlib import Path from typing import TYPE_CHECKING, Any @@ -314,7 +316,7 @@ def do_connect( catalog: str | None = None, schema: str = "default", use_cloud_fetch: bool = False, - memtable_volume: str | None = "__ibis_memtables__", + memtable_volume: str | None = None, staging_allowed_local_path: str | None = None, **config: Any, ) -> None: @@ -339,6 +341,11 @@ def do_connect( staging_allowed_local_path=staging_allowed_local_path, **config, ) + if memtable_volume is None: + short_version = "".join(map(str, sys.version_info[:3])) + memtable_volume = ( + f"{getpass.getuser()}-py={short_version}-pid={os.getpid()}" + ) self._memtable_volume = memtable_volume self._memtable_catalog = self.current_catalog self._memtable_database = self.current_database @@ -352,7 +359,9 @@ def begin(self): @util.experimental @classmethod def from_connection( - cls, con, memtable_volume: str = "__ibis_memtables__" + cls, + con, + memtable_volume: str | None = None, ) -> Backend: """Create an Ibis client from an existing connection to a Databricks cloud instance. @@ -370,7 +379,7 @@ def from_connection( return new_backend def _post_connect(self, *, memtable_volume: str) -> None: - sql = f"CREATE VOLUME IF NOT EXISTS {memtable_volume} COMMENT 'Ibis memtable storage volume'" + sql = f"CREATE VOLUME IF NOT EXISTS `{memtable_volume}` COMMENT 'Ibis memtable storage volume'" with self.con.cursor() as cur: cur.execute(sql) diff --git a/ibis/backends/databricks/tests/conftest.py b/ibis/backends/databricks/tests/conftest.py index 5ad70043e2d4..f63d5a59c7b2 100644 --- a/ibis/backends/databricks/tests/conftest.py +++ b/ibis/backends/databricks/tests/conftest.py @@ -1,6 +1,8 @@ from __future__ import annotations import concurrent.futures +import getpass +import sys from os import environ as env from typing import TYPE_CHECKING, Any @@ -25,25 +27,41 @@ def _load_data(self, **_: Any) -> None: import databricks.sql files = list(self.data_dir.joinpath("parquet").glob("*.parquet")) - volume_prefix = "/Volumes/ibis_testing/default/testing_data/parquet" - with ( - concurrent.futures.ThreadPoolExecutor() as exe, - databricks.sql.connect( - server_hostname=env["DATABRICKS_SERVER_HOSTNAME"], - http_path=env["DATABRICKS_HTTP_PATH"], - access_token=env["DATABRICKS_TOKEN"], - staging_allowed_local_path=str(self.data_dir), - ) as con, - ): - for fut in concurrent.futures.as_completed( - exe.submit( - put_into, - con, - f"PUT '{file}' INTO '{volume_prefix}/{file.name}' OVERWRITE", + + user = getpass.getuser() + python_version = "".join(map(str, sys.version_info[:3])) + volume = f"{user}_{python_version}" + volume_prefix = f"/Volumes/ibis_testing/default/{volume}" + + with databricks.sql.connect( + server_hostname=env["DATABRICKS_SERVER_HOSTNAME"], + http_path=env["DATABRICKS_HTTP_PATH"], + access_token=env["DATABRICKS_TOKEN"], + staging_allowed_local_path=str(self.data_dir), + ) as con: + with con.cursor() as cur: + cur.execute( + f"CREATE VOLUME IF NOT EXISTS ibis_testing.default.{volume} COMMENT 'Ibis test data storage'" ) - for file in files - ): - fut.result() + with concurrent.futures.ThreadPoolExecutor() as exe: + for fut in concurrent.futures.as_completed( + exe.submit( + put_into, + con, + f"PUT '{file}' INTO '{volume_prefix}/{file.name}' OVERWRITE", + ) + for file in files + ): + fut.result() + + with con.cursor() as cur: + for raw_stmt in self.ddl_script: + try: + stmt = raw_stmt.format(user=user, python_version=python_version) + except KeyError: # not a valid format string, just execute it + stmt = raw_stmt + + cur.execute(stmt) @staticmethod def connect(*, tmpdir, worker_id, **kw) -> BaseBackend: