Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cfr: Add support to directly export to an archive file #153

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cratedb_toolkit/cfr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import click
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter
from cratedb_toolkit.cfr.systable import Archive, SystemTableExporter, SystemTableImporter
from cratedb_toolkit.util.cli import (
boot_click,
error_logger,
Expand Down Expand Up @@ -46,8 +46,20 @@ def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: b
def sys_export(ctx: click.Context, target: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target))
target_path = path_from_url(target)
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=target_path)

archive = None
if target_path.name.endswith(".tgz") or target_path.name.endswith(".tar.gz"):
archive = Archive(stc)

path = stc.save()

if archive is not None:
path = archive.make_tarfile()
archive.close()
logger.info(f"Created archive file {target}")
seut marked this conversation as resolved.
Show resolved Hide resolved

jd({"path": str(path)})
except Exception as ex:
error_logger(ctx)(ex)
Expand Down
35 changes: 31 additions & 4 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import datetime as dt
import logging
import os
import tarfile
import tempfile
import typing as t
from pathlib import Path

Expand Down Expand Up @@ -87,19 +90,42 @@ def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, wit
return sql


class SystemTableExporter:
class PathProvider:

def __init__(self, path: t.Union[Path]):
self.path = path


class Archive:

def __init__(self, path_provider: PathProvider):
self.path_provider = path_provider
self.temp_dir = tempfile.TemporaryDirectory()
self.target_path = self.path_provider.path
self.path_provider.path = Path(self.temp_dir.name)

def close(self):
self.temp_dir.cleanup()

def make_tarfile(self) -> Path:
source_path = self.path_provider.path
with tarfile.open(self.target_path, "x:gz") as tar:
tar.add(source_path.absolute(), arcname=os.path.basename(source_path))
return self.target_path


class SystemTableExporter(PathProvider):
"""
Export schema and data from CrateDB system tables.
"""

def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"):
super().__init__(target)
self.dburi = dburi
self.target = target
self.data_format = data_format
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.info = InfoContainer(adapter=self.adapter)
self.inspector = SystemTableInspector(dburi=self.dburi)
self.target.mkdir(exist_ok=True, parents=True)

def read_table(self, tablename: str) -> pl.DataFrame:
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608
Expand All @@ -123,8 +149,9 @@ def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None):
raise NotImplementedError(f"Output format not implemented: {self.data_format}")

def save(self) -> Path:
self.path.mkdir(exist_ok=True, parents=True)
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.target / self.info.cluster_name / timestamp / "sys"
path = self.path / self.info.cluster_name / timestamp / "sys"
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
Expand Down
40 changes: 40 additions & 0 deletions tests/cfr/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os.path
import re
import shutil
import sys
import tarfile

import tests

Expand Down Expand Up @@ -49,6 +51,44 @@ def test_cfr_cli_export_success(cratedb, tmp_path, caplog):
assert len(data_files) >= 10


def test_cfr_cli_export_to_archive_file(cratedb, tmp_path, caplog):
"""
Verify `ctk cfr sys-export some-file.tgz` works.
"""

target = os.path.join(tmp_path, "cluster-data.tgz")

# Invoke command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)})
result = runner.invoke(
cli,
args=f"--debug sys-export {target}",
catch_exceptions=False,
)
assert result.exit_code == 0

# Verify log output.
assert "Exporting system tables to" in caplog.text
assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing"

# Verify outcome.
path = Path(json.loads(result.output)["path"])
assert "cluster-data.tgz" in path.name

data_files = []
schema_files = []
with tarfile.open(path, "r") as tar:
name_list = tar.getnames()
for name in name_list:
if "data" in name:
data_files.append(name)
elif "schema" in name:
schema_files.append(name)

assert len(schema_files) >= 19
assert len(data_files) >= 10


def test_cfr_cli_export_failure(cratedb, tmp_path, caplog):
"""
Verify `ctk cfr sys-export` failure.
Expand Down