Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Support installing Python packages from client (#218)
Browse files Browse the repository at this point in the history
On servers that have no Internet access, it is hard to install the
required Python packages for a UDF.

To mitigate this issue, this patch enables installing packages from the
client side. This works as follows:

1. Download the required packages to a directory on client with `pip
download`
2. Archive and upload the downloaded packages to server
3. Install the uploaded packages with `pip install --find-links`.

Please note that this only install the packages on a single server host.
To make the packages available to other hosts, an NFS on the server side
might be helpful.

This patch also fixes a bug to enable UDF arguments to use SQL keywords
as the names by double-quoting them.
  • Loading branch information
xuebinsu authored Oct 2, 2023
1 parent e94667e commit 66577de
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 14 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
POSTGRES_DB: ${{ env.TESTDB }}
ports:
- 5432:5432
options: --name postgres

steps:
- uses: actions/checkout@v2
Expand All @@ -59,6 +60,13 @@ jobs:
run: |
pip install tox==3.25.0
- name: List all containers
run: docker ps -a

- name: Install Python in PostgreSQL server container
run: |
docker exec --user 0 postgres sh -c 'apt-get update && apt-get install -y python3-pip && mkdir -p $(su --login postgres --session-command "python3 -m site --user-site") && chown -R postgres /var/lib/postgresql'
- name: Run Tests
run: |
export POSTGRES_PASSWORD=${{ env.PGPASSWORD }}
Expand Down
22 changes: 22 additions & 0 deletions greenplumpython/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,28 @@ def assign(self, **new_columns: Callable[[], Any]) -> "DataFrame":
targets.append(f"{_serialize_to_expr(v, db=self)} AS {k}")
return DataFrame(f"SELECT {','.join(targets)}", db=self)

# Add interface here for language servers.
#
# FIXME: Would be better to return something to inform that whether the
# operaton succeeded.
def install_packages(self, requirements: str) -> None:
"""
Install the required Python packages on the server host.
The packages will be installed to the currently activated Python
environment.
Args:
requirements: specification of what packages are required to be
installed. The format is the same as `the rquirements file
<https://pip.pypa.io/en/stable/reference/requirements-file-format/>`_
used by :code:`pip`. It can be obtained by reading an existing
requirements file in text mode.
"""
raise NotImplementedError(
"Please import greenplumpython.experimental.file to load the implementation."
)


def database(uri: Optional[str] = None, params: Dict[str, Optional[str]] = {}) -> Database:
"""
Expand Down
101 changes: 88 additions & 13 deletions greenplumpython/experimental/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@ def _dump_file_chunk(tmp_archive_name: str, chunk_base64: str) -> int:


@gp.create_function
def _extract_files(tmp_archive_name: str) -> list[str]:
def _extract_files(tmp_archive_name: str, returning: str) -> list[str]:
tmp_archive_base = pathlib.Path("/") / "tmp" / tmp_archive_name
tmp_archive_path = tmp_archive_base / f"{tmp_archive_name}.tar.gz"
extracted_dir = tmp_archive_base / "extracted"
if not extracted_dir.exists():
extracted_root = tmp_archive_base / "extracted"
if not extracted_root.exists():
with tarfile.open(tmp_archive_path, "r:gz") as tmp_archive:
extracted_dir.mkdir()
tmp_archive.extractall(str(extracted_dir))
extracted_root.mkdir()
tmp_archive.extractall(str(extracted_root))
tmp_archive_path.unlink()
for path in extracted_dir.rglob("*"):
if path.is_file() and not path.is_symlink():
yield str(path.resolve())
if returning == "root":
yield str(extracted_root.resolve())
else:
assert returning == "files"
for path in extracted_root.rglob("*"):
if path.is_file() and not path.is_symlink():
yield str(path.resolve())


@classmethod
def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame:
tmp_archive_name = f"tar_{uuid.uuid4().hex}"
def _archive_and_upload(tmp_archive_name: str, files: list[str], db: gp.Database):
tmp_archive_base = pathlib.Path("/") / "tmp" / tmp_archive_name
tmp_archive_base.mkdir()
tmp_archive_base.mkdir(exist_ok=True)
tmp_archive_path = tmp_archive_base / f"{tmp_archive_name}.tar.gz"
with tarfile.open(tmp_archive_path, "w:gz") as tmp_archive:
for file_path in files:
Expand Down Expand Up @@ -71,11 +73,84 @@ def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) ->
ORDER BY id;
"""
)


@classmethod
def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame:
tmp_archive_name = f"tar_{uuid.uuid4().hex}"
_archive_and_upload(tmp_archive_name, files, db)
func_sig = inspect.signature(parser.unwrap())
result_members = get_type_hints(func_sig.return_annotation)
return db.apply(
lambda: parser(_extract_files(tmp_archive_name)), expand=len(result_members) == 0
lambda: parser(_extract_files(tmp_archive_name, "files")),
expand=len(result_members) == 0,
)


setattr(gp.DataFrame, "from_files", _from_files)


import subprocess as sp
import sys


@gp.create_function
def _install_on_server(pkg_dir: str, requirements: str) -> str:
import subprocess as sp
import sys

assert sys.executable, "Python executable is required to install packages."
cmd = [
sys.executable,
"-m",
"pip",
"install",
"--no-index",
"--requirement",
"/dev/stdin",
"--find-links",
pkg_dir,
]
try:
output = sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements)
return output
except sp.CalledProcessError as e:
raise Exception(e.stdout)


def _install_packages(db: gp.Database, requirements: str):
tmp_archive_name = f"tar_{uuid.uuid4().hex}"
# FIXME: Windows client is not supported yet.
local_dir = pathlib.Path("/") / "tmp" / tmp_archive_name / "pip"
local_dir.mkdir(parents=True)
cmd = [
sys.executable,
"-m",
"pip",
"download",
"--requirement",
"/dev/stdin",
"--dest",
local_dir,
]
try:
sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements)
except sp.CalledProcessError as e:
raise e from Exception(e.stdout)
_archive_and_upload(tmp_archive_name, [local_dir.resolve()], db)
extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir")
assert len(list(extracted)) == 1
server_dir = (
pathlib.Path("/")
/ "tmp"
/ tmp_archive_name
/ "extracted"
/ local_dir.relative_to(local_dir.root)
)
installed = extracted.apply(
lambda _: _install_on_server(server_dir.as_uri(), requirements), column_name="result"
)
assert len(list(installed)) == 1


setattr(gp.Database, "install_packages", _install_packages)
2 changes: 1 addition & 1 deletion greenplumpython/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def _serialize(self, db: Database) -> str:
func_sig = inspect.signature(self._wrapped_func)
func_args = ",".join(
[
f"{param.name} {_serialize_to_type(param.annotation, db=db)}"
f'"{param.name}" {_serialize_to_type(param.annotation, db=db)}'
for param in func_sig.parameters.values()
]
)
Expand Down
51 changes: 51 additions & 0 deletions tests/test_file.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import subprocess as sp
import sys
from dataclasses import dataclass
from uuid import uuid4

Expand Down Expand Up @@ -41,3 +43,52 @@ def test_csv_multi_chunks(db: gp.Database):
assert greenplumpython.experimental.file._CHUNK_SIZE == 3
test_csv_single_chunk(db)
greenplumpython.experimental.file._CHUNK_SIZE = default_chunk_size


import subprocess
import sys


@gp.create_function
def pip_show(pkg_name: str) -> str:
cmd = [
sys.executable,
"-m",
"pip",
"show",
pkg_name,
]
try:
return subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise e from Exception(e.stdout)


@gp.create_function
def sys_path() -> list[str]:
return sys.path


def test_intall_pacakges(db: gp.Database):
print(db.install_packages("faker==19.6.1"))
print(db.apply(lambda: pip_show("faker"), column_name="pip_show"))
print(db.apply(lambda: sys_path(), column_name="sys_path"))

@gp.create_function
def fake_name() -> str:
from faker import Faker # type: ignore reportMissingImports

fake = Faker()
return fake.name()

assert len(list(db.apply(lambda: fake_name()))) == 1

try:
subprocess.check_output(
[sys.executable, "-m", "pip", "uninstall", "faker"],
text=True,
stderr=subprocess.STDOUT,
input="y",
)
except subprocess.CalledProcessError as e:
raise e from Exception(e.stdout)

0 comments on commit 66577de

Please sign in to comment.