Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CFR and WTF diagnostics programs #88

Merged
merged 15 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[cloud,datasets,io,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[full,test,develop]

- name: Run linter and software tests
env:
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
.venv*
__pycache__
dist
build
.coverage*
coverage.xml
/cfr
/doc/_build
/tmp
DOWNLOAD
/DOWNLOAD
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@


## Unreleased
- Add `ctk cfr` and `ctk wtf` diagnostics programs
- Remove support for Python 3.7

## 2024/06/11 v0.0.13
- Dependencies: Migrate from `crate[sqlalchemy]` to `sqlalchemy-cratedb`

Expand Down
Empty file added cratedb_toolkit/cfr/__init__.py
Empty file.
67 changes: 67 additions & 0 deletions cratedb_toolkit/cfr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import sys

import click
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter
from cratedb_toolkit.util.cli import (
boot_click,
error_logger,
make_command,
)
from cratedb_toolkit.util.data import jd, path_from_url

logger = logging.getLogger(__name__)


cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
@cratedb_sqlalchemy_option
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool):
"""
Diagnostics and informational utilities.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)

Check warning on line 38 in cratedb_toolkit/cfr/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/cli.py#L37-L38

Added lines #L37 - L38 were not covered by tests
ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub})
return boot_click(ctx, verbose, debug)


@make_command(cli, "sys-export")
@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr")
@click.pass_context
def sys_export(ctx: click.Context, target: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target))
path = stc.save()
jd({"path": str(path)})
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)


@make_command(cli, "sys-import")
@click.argument("source", envvar="CFR_SOURCE", type=str, required=True)
@click.pass_context
def sys_import(ctx: click.Context, source: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source))
stc.load()
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)
230 changes: 230 additions & 0 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""
CrateDB Diagnostics: System Tables Exporter and Importer.

Schemas and results of following queries should be included:
```sql
SELECT * FROM sys.cluster
SELECT * FROM sys.nodes
SELECT * FROM sys.shards
SELECT * FROM sys.allocations
SELECT * FROM sys.jobs_log
SELECT * FROM sys.operations_log
```

https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html
https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string
"""

import datetime as dt
import logging
import typing as t
from pathlib import Path

import polars as pl
import sqlalchemy as sa
from tqdm import tqdm

from cratedb_toolkit.sqlalchemy.patch import patch_encoder
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import error_logger
from cratedb_toolkit.wtf.core import InfoContainer

logger = logging.getLogger(__name__)


DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"]


class SystemTableKnowledge:
"""
Manage a few bits of knowledge about CrateDB internals.
"""

# Name of CrateDB's schema for system tables.
SYS_SCHEMA = "sys"

# TODO: Reflecting the `summits` table raises an error.
# AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec'
REFLECTION_BLOCKLIST = ["summits"]


class ExportSettings:
"""
Manage a few bits of knowledge about how to export system tables from CrateDB.
"""

# Subdirectories where to store schema vs. data information.
SCHEMA_PATH = "schema"
DATA_PATH = "data"

# The filename prefix when storing tables to disk.
TABLE_FILENAME_PREFIX = "sys-"


class SystemTableInspector:
"""
Reflect schema information from CrateDB system tables.
"""

def __init__(self, dburi: str):
self.dburi = dburi
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.engine = self.adapter.engine
self.inspector = sa.inspect(self.engine)

def table_names(self):
return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA)

def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str:
meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA)
table = sa.Table(tablename_in, meta, autoload_with=self.engine)
table.schema = out_schema
table.name = tablename_out
sql = ""
if with_drop_table:
sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n"

Check warning on line 85 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L85

Added line #L85 was not covered by tests
sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n"
return sql


class SystemTableExporter:
"""
Export schema and data from CrateDB system tables.
"""

def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"):
self.dburi = dburi
self.target = target
self.data_format = data_format
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.info = InfoContainer(adapter=self.adapter)
self.inspector = SystemTableInspector(dburi=self.dburi)
self.target.mkdir(exist_ok=True, parents=True)

def read_table(self, tablename: str) -> pl.DataFrame:
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608
logger.debug(f"Running SQL: {sql}")
return pl.read_database(
query=sql, # noqa: S608
connection=self.adapter.engine,
infer_schema_length=1000,
)

def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None):
if self.data_format == "csv":
# polars.exceptions.ComputeError: CSV format does not support nested data
# return df.write_csv() # noqa: ERA001
return frame.to_pandas().to_csv(file)

Check warning on line 117 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L117

Added line #L117 was not covered by tests
elif self.data_format in ["jsonl", "ndjson"]:
return frame.write_ndjson(file and file.buffer) # type: ignore[arg-type]
elif self.data_format in ["parquet", "pq"]:
return frame.write_parquet(file and file.buffer) # type: ignore[arg-type]

Check warning on line 121 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L120-L121

Added lines #L120 - L121 were not covered by tests
else:
raise NotImplementedError(f"Output format not implemented: {self.data_format}")

Check warning on line 123 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L123

Added line #L123 was not covered by tests

def save(self) -> Path:
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.target / self.info.cluster_name / timestamp / "sys"
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
path_data = path / ExportSettings.DATA_PATH
path_schema.mkdir(parents=True, exist_ok=True)
path_data.mkdir(parents=True, exist_ok=True)
table_count = 0
for tablename in tqdm(system_tables, disable=None):
logger.debug(f"Exporting table: {tablename}")
if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST:
continue

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"
tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}"

# Write schema file.
with open(path_table_schema, "w") as fh_schema:
print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema)

# Write data file.
df = self.read_table(tablename=tablename)
if df.is_empty():
continue

table_count += 1

mode = "w"
if self.data_format in ["parquet", "pq"]:
mode = "wb"

Check warning on line 157 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L157

Added line #L157 was not covered by tests
with open(path_table_data, mode) as fh_data:
self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data))

logger.info(f"Successfully exported {table_count} system tables")
return path


class SystemTableImporter:
"""
Import schema and data about CrateDB system tables.
"""

def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False):
self.dburi = dburi
self.source = source
self.data_format = data_format
self.debug = debug
self.adapter = DatabaseAdapter(dburi=self.dburi)

def table_names(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
names = []
for item in path_schema.glob("*.sql"):
name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "")
names.append(name)
return names

def load(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
path_data = self.source / ExportSettings.DATA_PATH

if not path_schema.exists():
raise FileNotFoundError(f"Path does not exist: {path_schema}")

logger.info(f"Importing system tables from: {self.source}")

table_count = 0
for tablename in tqdm(self.table_names()):
tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"

# Skip import of non-existing or empty files.
if not path_table_data.exists() or path_table_data.stat().st_size == 0:
continue

Check warning on line 203 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L203

Added line #L203 was not covered by tests

table_count += 1

# Invoke SQL DDL.
schema_sql = path_table_schema.read_text()
self.adapter.run_sql(schema_sql)

# Load data.
try:
df: pl.DataFrame = self.load_table(path_table_data)
df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append")
except Exception as ex:
error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}")

Check warning on line 216 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L215-L216

Added lines #L215 - L216 were not covered by tests

logger.info(f"Successfully imported {table_count} system tables")
# df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501

def load_table(self, path: Path) -> pl.DataFrame:
if path.suffix in [".jsonl"]:
return pl.read_ndjson(path)
elif path.suffix in [".parquet", ".pq"]:
return pl.read_parquet(path)

Check warning on line 225 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L224-L225

Added lines #L224 - L225 were not covered by tests
else:
raise NotImplementedError(f"Input format not implemented: {path.suffix}")

Check warning on line 227 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L227

Added line #L227 was not covered by tests


patch_encoder()
4 changes: 4 additions & 0 deletions cratedb_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from cratedb_toolkit.util.cli import boot_click

from .cfr.cli import cli as cfr_cli
from .cluster.cli import cli as cloud_cli
from .io.cli import cli as io_cli
from .job.cli import cli_list_jobs
from .shell.cli import cli as shell_cli
from .wtf.cli import cli as wtf_cli


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
Expand All @@ -18,7 +20,9 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
return boot_click(ctx, verbose, debug)


cli.add_command(cfr_cli, name="cfr")
cli.add_command(cloud_cli, name="cluster")
cli.add_command(io_cli, name="load")
cli.add_command(shell_cli, name="shell")
cli.add_command(wtf_cli, name="wtf")
cli.add_command(cli_list_jobs)
1 change: 1 addition & 0 deletions cratedb_toolkit/retention/strategy/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
In CrateDB, tables for storing retention policies need to be created once manually.
See the file setup/schema.sql in this repository.
"""

import dataclasses
import logging

Expand Down
1 change: 1 addition & 0 deletions cratedb_toolkit/retention/strategy/reallocate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Tables for storing retention policies need to be created once manually in
CrateDB. See the file setup/schema.sql in this repository.
"""

import dataclasses
import logging

Expand Down
1 change: 1 addition & 0 deletions cratedb_toolkit/retention/strategy/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
In CrateDB, tables for storing retention policies need to be created once manually.
See the file setup/schema.sql in this repository.
"""

import dataclasses
import logging

Expand Down
Loading