Skip to content

Commit

Permalink
cfr: Add support to export systables to a tarfile
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Jun 13, 2024
1 parent d191cb9 commit 01960de
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
16 changes: 14 additions & 2 deletions cratedb_toolkit/cfr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import click
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter
from cratedb_toolkit.cfr.systable import Archive, SystemTableExporter, SystemTableImporter
from cratedb_toolkit.util.cli import (
boot_click,
error_logger,
Expand Down Expand Up @@ -46,8 +46,20 @@ def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: b
def sys_export(ctx: click.Context, target: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target))
target_path = path_from_url(target)
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=target_path)

archive = None
if target_path.name.endswith(".tgz") or target_path.name.endswith(".tar.gz"):
archive = Archive(stc)

path = stc.save()

if archive is not None:
path = archive.make_tarfile()
archive.close()
logger.info(f"Created archive file {target}")

jd({"path": str(path)})
except Exception as ex:
error_logger(ctx)(ex)
Expand Down
35 changes: 31 additions & 4 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import datetime as dt
import logging
import os
import tarfile
import tempfile
import typing as t
from pathlib import Path

Expand Down Expand Up @@ -87,19 +90,42 @@ def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, wit
return sql


class SystemTableExporter:
class PathProvider:

def __init__(self, path: t.Union[Path]):
self.path = path


class Archive:

def __init__(self, path_provider: PathProvider):
self.path_provider = path_provider
self.temp_dir = tempfile.TemporaryDirectory()
self.target_path = self.path_provider.path
self.path_provider.path = Path(self.temp_dir.name)

def close(self):
self.temp_dir.cleanup()

def make_tarfile(self) -> Path:
source_path = self.path_provider.path
with tarfile.open(self.target_path, "x:gz") as tar:
tar.add(source_path.absolute(), arcname=os.path.basename(source_path))
return self.target_path


class SystemTableExporter(PathProvider):
"""
Export schema and data from CrateDB system tables.
"""

def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"):
super().__init__(target)
self.dburi = dburi
self.target = target
self.data_format = data_format
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.info = InfoContainer(adapter=self.adapter)
self.inspector = SystemTableInspector(dburi=self.dburi)
self.target.mkdir(exist_ok=True, parents=True)

def read_table(self, tablename: str) -> pl.DataFrame:
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608
Expand All @@ -123,8 +149,9 @@ def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None):
raise NotImplementedError(f"Output format not implemented: {self.data_format}")

def save(self) -> Path:
self.path.mkdir(exist_ok=True, parents=True)
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.target / self.info.cluster_name / timestamp / "sys"
path = self.path / self.info.cluster_name / timestamp / "sys"
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
Expand Down
40 changes: 40 additions & 0 deletions tests/cfr/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os.path
import re
import shutil
import sys
import tarfile

import tests

Expand Down Expand Up @@ -49,6 +51,44 @@ def test_cfr_cli_export_success(cratedb, tmp_path, caplog):
assert len(data_files) >= 10


def test_cfr_cli_export_to_archive_file(cratedb, tmp_path, caplog):
"""
Verify `ctk cfr sys-export some-file.tgz` works.
"""

target = os.path.join(tmp_path, "cluster-data.tgz")

# Invoke command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)})
result = runner.invoke(
cli,
args=f"--debug sys-export {target}",
catch_exceptions=False,
)
assert result.exit_code == 0

# Verify log output.
assert "Exporting system tables to" in caplog.text
assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing"

# Verify outcome.
path = Path(json.loads(result.output)["path"])
assert "cluster-data.tgz" in path.name

data_files = []
schema_files = []
with tarfile.open(path, "r") as tar:
name_list = tar.getnames()
for name in name_list:
if "data" in name:
data_files.append(name)
elif "schema" in name:
schema_files.append(name)

assert len(schema_files) >= 19
assert len(data_files) >= 10


def test_cfr_cli_export_failure(cratedb, tmp_path, caplog):
"""
Verify `ctk cfr sys-export` failure.
Expand Down

0 comments on commit 01960de

Please sign in to comment.