Skip to content

Commit

Permalink
feat(databricks): isolate volume creation (#10432)
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Nov 5, 2024
1 parent 291bb69 commit 3cdccfb
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
10 changes: 5 additions & 5 deletions ci/schema/databricks.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
CREATE VIEW IF NOT EXISTS diamonds AS
SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/diamonds.parquet`;
SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/diamonds.parquet`;

CREATE VIEW IF NOT EXISTS batting AS
SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/batting.parquet`;
SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/batting.parquet`;

CREATE VIEW IF NOT EXISTS awards_players AS
SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/awards_players.parquet`;
SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/awards_players.parquet`;

CREATE VIEW IF NOT EXISTS functional_alltypes AS
SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/functional_alltypes.parquet`;
SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/functional_alltypes.parquet`;

CREATE VIEW IF NOT EXISTS astronauts AS
SELECT * FROM parquet.`/Volumes/ibis_testing/default/testing_data/parquet/astronauts.parquet`;
SELECT * FROM parquet.`/Volumes/ibis_testing/default/{user}_{python_version}/astronauts.parquet`;

CREATE TABLE IF NOT EXISTS `array_types` AS
VALUES (ARRAY(CAST(1 AS BIGINT), 2, 3), ARRAY('a', 'b', 'c'), ARRAY(1.0, 2.0, 3.0), 'a', 1.0, ARRAY(ARRAY(), ARRAY(CAST(1 AS BIGINT), 2, 3), NULL)),
Expand Down
15 changes: 12 additions & 3 deletions ibis/backends/databricks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import contextlib
import functools
import getpass
import os
import sys
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -314,7 +316,7 @@ def do_connect(
catalog: str | None = None,
schema: str = "default",
use_cloud_fetch: bool = False,
memtable_volume: str | None = "__ibis_memtables__",
memtable_volume: str | None = None,
staging_allowed_local_path: str | None = None,
**config: Any,
) -> None:
Expand All @@ -339,6 +341,11 @@ def do_connect(
staging_allowed_local_path=staging_allowed_local_path,
**config,
)
if memtable_volume is None:
short_version = "".join(map(str, sys.version_info[:3]))
memtable_volume = (
f"{getpass.getuser()}-py={short_version}-pid={os.getpid()}"
)
self._memtable_volume = memtable_volume
self._memtable_catalog = self.current_catalog
self._memtable_database = self.current_database
Expand All @@ -352,7 +359,9 @@ def begin(self):
@util.experimental
@classmethod
def from_connection(
cls, con, memtable_volume: str = "__ibis_memtables__"
cls,
con,
memtable_volume: str | None = None,
) -> Backend:
"""Create an Ibis client from an existing connection to a Databricks cloud instance.
Expand All @@ -370,7 +379,7 @@ def from_connection(
return new_backend

def _post_connect(self, *, memtable_volume: str) -> None:
sql = f"CREATE VOLUME IF NOT EXISTS {memtable_volume} COMMENT 'Ibis memtable storage volume'"
sql = f"CREATE VOLUME IF NOT EXISTS `{memtable_volume}` COMMENT 'Ibis memtable storage volume'"
with self.con.cursor() as cur:
cur.execute(sql)

Expand Down
54 changes: 36 additions & 18 deletions ibis/backends/databricks/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import concurrent.futures
import getpass
import sys
from os import environ as env
from typing import TYPE_CHECKING, Any

Expand All @@ -25,25 +27,41 @@ def _load_data(self, **_: Any) -> None:
import databricks.sql

files = list(self.data_dir.joinpath("parquet").glob("*.parquet"))
volume_prefix = "/Volumes/ibis_testing/default/testing_data/parquet"
with (
concurrent.futures.ThreadPoolExecutor() as exe,
databricks.sql.connect(
server_hostname=env["DATABRICKS_SERVER_HOSTNAME"],
http_path=env["DATABRICKS_HTTP_PATH"],
access_token=env["DATABRICKS_TOKEN"],
staging_allowed_local_path=str(self.data_dir),
) as con,
):
for fut in concurrent.futures.as_completed(
exe.submit(
put_into,
con,
f"PUT '{file}' INTO '{volume_prefix}/{file.name}' OVERWRITE",

user = getpass.getuser()
python_version = "".join(map(str, sys.version_info[:3]))
volume = f"{user}_{python_version}"
volume_prefix = f"/Volumes/ibis_testing/default/{volume}"

with databricks.sql.connect(
server_hostname=env["DATABRICKS_SERVER_HOSTNAME"],
http_path=env["DATABRICKS_HTTP_PATH"],
access_token=env["DATABRICKS_TOKEN"],
staging_allowed_local_path=str(self.data_dir),
) as con:
with con.cursor() as cur:
cur.execute(
f"CREATE VOLUME IF NOT EXISTS ibis_testing.default.{volume} COMMENT 'Ibis test data storage'"
)
for file in files
):
fut.result()
with concurrent.futures.ThreadPoolExecutor() as exe:
for fut in concurrent.futures.as_completed(
exe.submit(
put_into,
con,
f"PUT '{file}' INTO '{volume_prefix}/{file.name}' OVERWRITE",
)
for file in files
):
fut.result()

with con.cursor() as cur:
for raw_stmt in self.ddl_script:
try:
stmt = raw_stmt.format(user=user, python_version=python_version)
except KeyError: # not a valid format string, just execute it
stmt = raw_stmt

cur.execute(stmt)

@staticmethod
def connect(*, tmpdir, worker_id, **kw) -> BaseBackend:
Expand Down

0 comments on commit 3cdccfb

Please sign in to comment.