-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cfr: Add
ctk cfr
diagnostics program
Add basic implementation for `sys-export` and `sys-import` subcommands. It is about exporting system tables of CrateDB into SQL DDL and JSONL files, and re-importing them for later analysis.
- Loading branch information
Showing
14 changed files
with
483 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ __pycache__ | |
dist | ||
.coverage* | ||
coverage.xml | ||
/cfr | ||
/foo | ||
/tmp | ||
/DOWNLOAD |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# CrateDB Cluster Flight Recorder (CFR) | ||
|
||
Collect required cluster information for support requests | ||
and self-service debugging. | ||
|
||
|
||
## Synopsis | ||
|
||
Define CrateDB database cluster address. | ||
```shell | ||
export CRATEDB_SQLALCHEMY_URL=crate://localhost/ | ||
``` | ||
|
||
Export system table information into timestamped file, | ||
by default into the `cfr/sys` directory. | ||
```shell | ||
ctk cfr sys-export | ||
``` | ||
|
||
|
||
## Usage | ||
|
||
Export system table information into given directory. | ||
```shell | ||
ctk cfr sys-export file:///var/ctk/cfr/sys | ||
``` | ||
|
||
Import system table information from given directory. | ||
```shell | ||
ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37 | ||
``` | ||
|
||
In order to define the CrateDB database address on the | ||
command line, use a command like this. | ||
```shell | ||
ctk cfr --cratedb-sqlalchemy-url=crate://localhost/ sys-export | ||
``` | ||
|
||
|
||
## OCI | ||
|
||
If you don't want or can't install the program, you can also use its OCI | ||
container image, for example on Docker, Postman, or Kubernetes. | ||
|
||
Optionally, start a CrateDB single-node instance for testing purposes. | ||
```shell | ||
docker run --rm -it \ | ||
--name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=4g \ | ||
crate/crate:nightly -Cdiscovery.type=single-node | ||
``` | ||
|
||
Define the database URI address, and an alias to the `cfr` program. | ||
```shell | ||
echo "CRATEDB_SQLALCHEMY_URL=crate://localhost/" > .env | ||
alias cfr="docker run --rm -it --network=host --volume=$(PWD)/cfr:/cfr --env-file=.env ghcr.io/crate-workbench/cratedb-toolkit:latest ctk cfr" | ||
``` | ||
|
||
Verify everything works. | ||
```shell | ||
cfr --help | ||
``` |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# CFR Backlog | ||
|
||
## Iteration +1 | ||
- sys-export: Does the program need capabilities to **LIMIT** cardinality | ||
on `sys-export` operations, for example, when they are super large? | ||
- sys-import: Accept target database schema. | ||
- Combine with `ctk wtf info` | ||
- Converge output into tar archive | ||
|
||
## Iteration +2 | ||
- Cluster name muss in `cfr/<name>/sys/<timestamp>`, für multi-tenancy operations. | ||
|
||
## Iteration +3 | ||
- Wie komme ich ans `crate.yaml`? | ||
- Wie komme ich an die Logfiles? `docker log`? |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# Copyright (c) 2021-2024, Crate.io Inc. | ||
# Distributed under the terms of the AGPLv3 license, see LICENSE. | ||
import logging | ||
import sys | ||
|
||
import click | ||
from click_aliases import ClickAliasedGroup | ||
|
||
from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter | ||
from cratedb_toolkit.util.cli import ( | ||
boot_click, | ||
error_logger, | ||
make_command, | ||
) | ||
from cratedb_toolkit.util.data import jd, path_from_url | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
cratedb_sqlalchemy_option = click.option( | ||
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" | ||
) | ||
|
||
|
||
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] | ||
@cratedb_sqlalchemy_option | ||
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") | ||
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") | ||
@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") | ||
@click.version_option() | ||
@click.pass_context | ||
def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): | ||
""" | ||
Diagnostics and informational utilities. | ||
""" | ||
if not cratedb_sqlalchemy_url: | ||
logger.error("Unable to operate without database address") | ||
sys.exit(1) | ||
ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) | ||
return boot_click(ctx, verbose, debug) | ||
|
||
|
||
@make_command(cli, "sys-export") | ||
@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys") | ||
@click.pass_context | ||
def sys_export(ctx: click.Context, target: str): | ||
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] | ||
try: | ||
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target)) | ||
path = stc.save() | ||
jd({"path": str(path)}) | ||
except Exception as ex: | ||
error_logger(ctx)(ex) | ||
sys.exit(1) | ||
|
||
|
||
@make_command(cli, "sys-import") | ||
@click.argument("source", envvar="CFR_SOURCE", type=str, required=True) | ||
@click.pass_context | ||
def sys_import(ctx: click.Context, source: str): | ||
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] | ||
try: | ||
stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source)) | ||
stc.load() | ||
except Exception as ex: | ||
error_logger(ctx)(ex) | ||
sys.exit(1) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
""" | ||
CrateDB Diagnostics: System Tables Exporter and Importer. | ||
Schemas and results of following queries should be included: | ||
```sql | ||
SELECT * FROM sys.cluster | ||
SELECT * FROM sys.nodes | ||
SELECT * FROM sys.shards | ||
SELECT * FROM sys.allocations | ||
SELECT * FROM sys.jobs_log | ||
SELECT * FROM sys.operations_log | ||
``` | ||
https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html | ||
https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string | ||
""" | ||
|
||
import datetime as dt | ||
import logging | ||
import typing as t | ||
from pathlib import Path | ||
|
||
import polars as pl | ||
import sqlalchemy as sa | ||
from tqdm import tqdm | ||
|
||
from cratedb_toolkit.sqlalchemy.patch import patch_encoder | ||
from cratedb_toolkit.util import DatabaseAdapter | ||
from cratedb_toolkit.util.cli import error_logger | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"] | ||
|
||
|
||
class SystemTableKnowledge: | ||
""" | ||
Manage a few bits of knowledge about CrateDB internals. | ||
""" | ||
|
||
# Name of CrateDB's schema for system tables. | ||
SYS_SCHEMA = "sys" | ||
|
||
# TODO: Reflecting the `summits` table raises an error. | ||
# AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec' | ||
REFLECTION_BLOCKLIST = ["summits"] | ||
|
||
|
||
class ExportSettings: | ||
""" | ||
Manage a few bits of knowledge about how to export system tables from CrateDB. | ||
""" | ||
|
||
# Subdirectories where to store schema vs. data information. | ||
SCHEMA_PATH = "schema" | ||
DATA_PATH = "data" | ||
|
||
# The filename prefix when storing tables to disk. | ||
TABLE_FILENAME_PREFIX = "sys-" | ||
|
||
|
||
class SystemTableInspector: | ||
""" | ||
Reflect schema information from CrateDB system tables. | ||
""" | ||
|
||
def __init__(self, dburi: str): | ||
self.dburi = dburi | ||
self.adapter = DatabaseAdapter(dburi=self.dburi) | ||
self.engine = self.adapter.engine | ||
self.inspector = sa.inspect(self.engine) | ||
|
||
def table_names(self): | ||
return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA) | ||
|
||
def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str: | ||
meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA) | ||
table = sa.Table(tablename_in, meta, autoload_with=self.engine) | ||
table.schema = out_schema | ||
table.name = tablename_out | ||
sql = "" | ||
if with_drop_table: | ||
sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n" | ||
sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n" | ||
return sql | ||
|
||
|
||
class SystemTableExporter: | ||
""" | ||
Export schema and data from CrateDB system tables. | ||
""" | ||
|
||
def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"): | ||
self.dburi = dburi | ||
self.target = target | ||
self.data_format = data_format | ||
self.adapter = DatabaseAdapter(dburi=self.dburi) | ||
self.engine = self.adapter.engine | ||
self.inspector = SystemTableInspector(dburi=self.dburi) | ||
self.target.mkdir(exist_ok=True, parents=True) | ||
|
||
def read_table(self, tablename: str) -> pl.DataFrame: | ||
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608 | ||
# logger.info(f"Running SQL: {sql}") # noqa: ERA001 | ||
return pl.read_database( | ||
query=sql, # noqa: S608 | ||
connection=self.engine, | ||
) | ||
|
||
def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): | ||
if self.data_format == "csv": | ||
# polars.exceptions.ComputeError: CSV format does not support nested data | ||
# return df.write_csv() # noqa: ERA001 | ||
return frame.to_pandas().to_csv(file) | ||
elif self.data_format in ["jsonl", "ndjson"]: | ||
return frame.write_ndjson(file and file.buffer) # type: ignore[arg-type] | ||
elif self.data_format in ["parquet", "pq"]: | ||
return frame.write_parquet(file and file.buffer) # type: ignore[arg-type] | ||
else: | ||
raise NotImplementedError(f"Output format not implemented: {self.data_format}") | ||
|
||
def save(self) -> Path: | ||
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") | ||
path = self.target / timestamp | ||
logger.info(f"Exporting system tables to: {path}") | ||
system_tables = self.inspector.table_names() | ||
path_schema = path / ExportSettings.SCHEMA_PATH | ||
path_data = path / ExportSettings.DATA_PATH | ||
path_schema.mkdir(parents=True, exist_ok=True) | ||
path_data.mkdir(parents=True, exist_ok=True) | ||
table_count = 0 | ||
for tablename in tqdm(system_tables, disable=None): | ||
if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST: | ||
continue | ||
|
||
table_count += 1 | ||
|
||
path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" | ||
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" | ||
tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" | ||
|
||
# Write schema file. | ||
with open(path_table_schema, "w") as fh_schema: | ||
print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema) | ||
|
||
# Write data file. | ||
df = self.read_table(tablename=tablename) | ||
if df.is_empty(): | ||
continue | ||
mode = "w" | ||
if self.data_format in ["parquet", "pq"]: | ||
mode = "wb" | ||
with open(path_table_data, mode) as fh_data: | ||
self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data)) | ||
|
||
logger.info(f"Successfully exported {table_count} system tables") | ||
return path | ||
|
||
|
||
class SystemTableImporter: | ||
""" | ||
Import schema and data about CrateDB system tables. | ||
""" | ||
|
||
def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False): | ||
self.dburi = dburi | ||
self.source = source | ||
self.data_format = data_format | ||
self.debug = debug | ||
self.adapter = DatabaseAdapter(dburi=self.dburi) | ||
|
||
def table_names(self): | ||
path_schema = self.source / ExportSettings.SCHEMA_PATH | ||
names = [] | ||
for item in path_schema.glob("*.sql"): | ||
name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "") | ||
names.append(name) | ||
return names | ||
|
||
def load(self): | ||
path_schema = self.source / ExportSettings.SCHEMA_PATH | ||
path_data = self.source / ExportSettings.DATA_PATH | ||
|
||
if not path_schema.exists(): | ||
raise FileNotFoundError(f"Path does not exist: {path_schema}") | ||
|
||
logger.info(f"Importing system tables from: {self.source}") | ||
|
||
for tablename in tqdm(self.table_names()): | ||
tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename | ||
|
||
path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" | ||
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" | ||
|
||
# Skip import of non-existing or empty files. | ||
if not path_table_data.exists() or path_table_data.stat().st_size == 0: | ||
continue | ||
|
||
# Invoke SQL DDL. | ||
schema_sql = path_table_schema.read_text() | ||
self.adapter.run_sql(schema_sql) | ||
|
||
# Load data. | ||
try: | ||
df: pl.DataFrame = self.load_table(path_table_data) | ||
df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append") | ||
except Exception as ex: | ||
error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") | ||
|
||
# df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 | ||
|
||
def load_table(self, path: Path) -> pl.DataFrame: | ||
if path.suffix in [".jsonl"]: | ||
return pl.read_ndjson(path) | ||
elif path.suffix in [".parquet", ".pq"]: | ||
return pl.read_parquet(path) | ||
else: | ||
raise NotImplementedError(f"Input format not implemented: {path.suffix}") | ||
|
||
|
||
patch_encoder() |
Oops, something went wrong.