From 528ea346f9853031fadd8a9279548646daf6b426 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 1 Dec 2023 14:13:58 +0100 Subject: [PATCH 01/15] wtf: Add `ctk wtf` diagnostics program --- .gitignore | 3 +- CHANGES.md | 2 + cratedb_toolkit/cli.py | 2 + cratedb_toolkit/util/cli.py | 8 +- cratedb_toolkit/util/data.py | 19 +- cratedb_toolkit/util/platform.py | 56 +++ cratedb_toolkit/util/service.py | 23 + cratedb_toolkit/wtf/README.md | 51 +++ cratedb_toolkit/wtf/__init__.py | 0 cratedb_toolkit/wtf/backlog.md | 39 ++ cratedb_toolkit/wtf/cli.py | 188 ++++++++ cratedb_toolkit/wtf/core.py | 81 ++++ cratedb_toolkit/wtf/http.py | 41 ++ cratedb_toolkit/wtf/library.py | 601 +++++++++++++++++++++++++ cratedb_toolkit/wtf/model.py | 86 ++++ cratedb_toolkit/wtf/query_collector.py | 247 ++++++++++ cratedb_toolkit/wtf/util.py | 21 + pyproject.toml | 7 +- tests/retention/test_cli.py | 2 +- tests/wtf/__init__.py | 0 tests/wtf/test_cli.py | 111 +++++ 21 files changed, 1579 insertions(+), 9 deletions(-) create mode 100644 cratedb_toolkit/util/platform.py create mode 100644 cratedb_toolkit/util/service.py create mode 100644 cratedb_toolkit/wtf/README.md create mode 100644 cratedb_toolkit/wtf/__init__.py create mode 100644 cratedb_toolkit/wtf/backlog.md create mode 100644 cratedb_toolkit/wtf/cli.py create mode 100644 cratedb_toolkit/wtf/core.py create mode 100644 cratedb_toolkit/wtf/http.py create mode 100644 cratedb_toolkit/wtf/library.py create mode 100644 cratedb_toolkit/wtf/model.py create mode 100644 cratedb_toolkit/wtf/query_collector.py create mode 100644 cratedb_toolkit/wtf/util.py create mode 100644 tests/wtf/__init__.py create mode 100644 tests/wtf/test_cli.py diff --git a/.gitignore b/.gitignore index 8e95088c..c272cda7 100644 --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,6 @@ __pycache__ dist .coverage* coverage.xml +/foo /tmp -DOWNLOAD +/DOWNLOAD diff --git a/CHANGES.md b/CHANGES.md index 74e2a373..f261bc05 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- Add `cratedb-wtf` diagnostics program + ## 2024/06/11 v0.0.13 - Dependencies: Migrate from `crate[sqlalchemy]` to `sqlalchemy-cratedb` diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 26c0ef65..9315a701 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -7,6 +7,7 @@ from .io.cli import cli as io_cli from .job.cli import cli_list_jobs from .shell.cli import cli as shell_cli +from .wtf.cli import cli as wtf_cli @click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] @@ -21,4 +22,5 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): cli.add_command(cloud_cli, name="cluster") cli.add_command(io_cli, name="load") cli.add_command(shell_cli, name="shell") +cli.add_command(wtf_cli, name="wtf") cli.add_command(cli_list_jobs) diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index 9ebcb3ab..f5956dea 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -72,13 +72,15 @@ def boot_with_dburi(): return dburi -def make_command(cli, name, helpfun=None, aliases=None): +def make_command(cli, name, help=None, aliases=None): # noqa: A002 """ Convenience shortcut for creating a subcommand. """ kwargs = {} - if helpfun: - kwargs["help"] = docstring_format_verbatim(helpfun.__doc__) + if isinstance(help, str): + kwargs["help"] = help + elif callable(help): + kwargs["help"] = docstring_format_verbatim(help.__doc__) return cli.command( name, context_settings={"max_content_width": 120}, diff --git a/cratedb_toolkit/util/data.py b/cratedb_toolkit/util/data.py index 120cd947..f8efb1bc 100644 --- a/cratedb_toolkit/util/data.py +++ b/cratedb_toolkit/util/data.py @@ -1,3 +1,4 @@ +import datetime as dt import json import sys import typing as t @@ -7,7 +8,7 @@ def jd(data: t.Any): """ Pretty-print JSON with indentation. """ - print(json.dumps(data, indent=2), file=sys.stdout) # noqa: T201 + print(json.dumps(data, indent=2, cls=JSONEncoderPlus), file=sys.stdout) # noqa: T201 def str_contains(haystack, *needles): @@ -18,9 +19,9 @@ def str_contains(haystack, *needles): return any(needle in haystack for needle in needles) -# from sqlalchemy.util.langhelpers -# from paste.deploy.converters def asbool(obj: t.Any) -> bool: + # from sqlalchemy.util.langhelpers + # from paste.deploy.converters if isinstance(obj, str): obj = obj.strip().lower() if obj in ["true", "yes", "on", "y", "t", "1"]: @@ -30,3 +31,15 @@ def asbool(obj: t.Any) -> bool: else: raise ValueError("String is not true/false: %r" % obj) return bool(obj) + + +class JSONEncoderPlus(json.JSONEncoder): + """ + https://stackoverflow.com/a/27058505 + """ + + def default(self, o): + if isinstance(o, dt.datetime): + return o.isoformat() + + return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/util/platform.py b/cratedb_toolkit/util/platform.py new file mode 100644 index 00000000..0c588dfd --- /dev/null +++ b/cratedb_toolkit/util/platform.py @@ -0,0 +1,56 @@ +import io +import json +from contextlib import redirect_stdout + + +class PlatformInfo: + @staticmethod + def application(): + import platform + + from cratedb_toolkit import __appname__, __version__ + + data = {} + + data["platform"] = platform.platform() + data["version"] = __version__ + data["name"] = __appname__ + return data + + @staticmethod + def libraries(): + data = {} + + # SQLAlchemy + from importlib.metadata import entry_points + + try: + import sqlalchemy.dialects.plugins + import sqlalchemy.dialects.registry + + data["sqlalchemy"] = { + "dialects_builtin": list(sqlalchemy.dialects.registry.impls.keys()), + "dialects_3rdparty": [dialect.name for dialect in entry_points(group="sqlalchemy.dialects")], # type: ignore[attr-defined,call-arg] + "plugins": list(sqlalchemy.dialects.plugins.impls.keys()), + } + except Exception: # noqa: S110 + pass + + # pandas + try: + import pandas + + buffer = io.StringIO() + with redirect_stdout(buffer): + pandas.show_versions(as_json=True) + buffer.seek(0) + data["pandas"] = json.load(buffer) + except Exception: # noqa: S110 + pass + + # fsspec + import fsspec + + data["fsspec"] = {"protocols": fsspec.available_protocols(), "compressions": fsspec.available_compressions()} + + return data diff --git a/cratedb_toolkit/util/service.py b/cratedb_toolkit/util/service.py new file mode 100644 index 00000000..a26c70b4 --- /dev/null +++ b/cratedb_toolkit/util/service.py @@ -0,0 +1,23 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import typing as t + +from cratedb_toolkit.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +def start_service(app: str, listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + setup_logging() + from uvicorn import run + + if listen_address is None: + listen_address = "127.0.0.1:4242" + + host, port = listen_address.split(":") + port_int = int(port) + + logger.info(f"Starting HTTP web service on http://{listen_address}") + + run(app=app, host=host, port=port_int, reload=reload) diff --git a/cratedb_toolkit/wtf/README.md b/cratedb_toolkit/wtf/README.md new file mode 100644 index 00000000..9f8c5777 --- /dev/null +++ b/cratedb_toolkit/wtf/README.md @@ -0,0 +1,51 @@ +# cratedb-wtf + +A diagnostics utility in the spirit of [git-wtf], [grafana-wtf], and [pip.wtf]. +It is still a work-in-progress, but it is usable already. + + +## Synopsis + +Define CrateDB database cluster address. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +``` + +Display system and database cluster information. +```shell +cratedb-wtf info +``` + +Display database cluster job information. +```shell +cratedb-wtf job-info +``` + +Display database cluster log messages. +```shell +cratedb-wtf logs +``` + +Statistics. +```shell +cratedb-wtf job-statistics collect +cratedb-wtf job-statistics view +``` + + +## HTTP API + +Expose collected status information. +```shell +cratedb-wtf --debug serve --reload +``` +Consume collected status information via HTTP. +```shell +http http://127.0.0.1:4242/info/all +``` + + + +[git-wtf]: http://thrawn01.org/posts/2014/03/03/git-wtf/ +[grafana-wtf]: https://github.com/panodata/grafana-wtf +[pip.wtf]: https://github.com/sabslikesobs/pip.wtf diff --git a/cratedb_toolkit/wtf/__init__.py b/cratedb_toolkit/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/wtf/backlog.md b/cratedb_toolkit/wtf/backlog.md new file mode 100644 index 00000000..cd6fc7e2 --- /dev/null +++ b/cratedb_toolkit/wtf/backlog.md @@ -0,0 +1,39 @@ +# cratedb-wtf backlog + +## Iteration +1 +- Display differences to the standard configuration +- `tail -f` for `sys.jobs_log` and friends + +## Iteration +2 +- Make `cratedb-wtf logs` also optionally consider `sys.` tables. +- cratedb-wtf explore table|shard|partition|node +- High-level analysis, evaluating a set of threshold rules +- High-level summary reports with heuristics support +- Network diagnostics? + +## Iteration +3 +- Make it work with CrateDB Cloud. + ``` + ctk cluster info + ctk cluster health + ctk cluster logs --slow-queries + ``` + +## Iteration +4 +- Expose collected data via Glances-like UI +- Experimental UI using Grafana Scenes + +## Done +- Make it work +- Proper marshalling of timestamp values (ISO 8601) +- Expose collected data via HTTP API + ``` + cratedb-wtf serve + ``` +- Provide `scrub` option also via HTTP +- Complete collected queries and code snippets +- Harvest queries from Admin UI, crash, crate-admin-alt +- Harvest queries from experts + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production&viewPanel=44 +- Add `description` and `unit` fields to each `InfoElement` definition diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py new file mode 100644 index 00000000..5b2761db --- /dev/null +++ b/cratedb_toolkit/wtf/cli.py @@ -0,0 +1,188 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import sys +import typing as t +import urllib.parse + +import click +from click_aliases import ClickAliasedGroup + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import ( + boot_click, + make_command, +) +from cratedb_toolkit.util.data import jd +from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer, LogContainer + +logger = logging.getLogger(__name__) + + +def help_info(): + """ + Database cluster and system information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf info + + """ # noqa: E501 + + +def help_logs(): + """ + Database cluster logs. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf logs + + """ # noqa: E501 + + +def help_statistics(): + """ + Database cluster job / query statistics. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf job-statistics quick + cratedb-wtf job-statistics collect + cratedb-wtf job-statistics view + + """ # noqa: E501 + + +def help_serve(): + """ + Start HTTP service to expose collected information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf serve + + """ # noqa: E501 + + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@cratedb_sqlalchemy_option +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") +@click.version_option() +@click.pass_context +def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): + """ + Diagnostics and informational utilities. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + return boot_click(ctx, verbose, debug) + + +@make_command(cli, "info", help_info) +@click.pass_context +def info(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = InfoContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(cli, "logs", help_logs) +@click.pass_context +def logs(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = LogContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(cli, "job-info", "Display information about jobs / queries.") +@click.pass_context +def job_information(ctx: click.Context): + """ + Display ad hoc job information. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = JobInfoContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@click.pass_context +def job_statistics(ctx: click.Context): + """ + Collect and display statistics about jobs / queries. + """ + pass + + +cli.add_command(job_statistics, name="job-statistics", aliases=["jobstats"]) + + +@make_command(job_statistics, "collect", "Collect queries from sys.jobs_log.") +@click.pass_context +def job_statistics_collect(ctx: click.Context): + """ + Run jobs_log collector. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.main() + + +@make_command(job_statistics, "view", "View job statistics about collected queries.") +@click.pass_context +def job_statistics_view(ctx: click.Context): + """ + View job statistics about collected queries. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + url = urllib.parse.urlparse(cratedb_sqlalchemy_url) + hostname = f"{url.hostname}:{url.port or 4200}" + os.environ["HOSTNAME"] = hostname + + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.init() + + response: t.Dict = {"meta": {}, "data": {}} + response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change." + response["data"]["stats"] = cratedb_toolkit.wtf.query_collector.read_stats() + jd(response) + + +@make_command(cli, "serve", help_serve) +@click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address") +@click.option("--reload", is_flag=True, help="Dynamically reload changed files") +@click.pass_context +def serve(ctx: click.Context, listen: str, reload: bool): + from cratedb_toolkit.wtf.http import start + + start(listen, reload=reload) diff --git a/cratedb_toolkit/wtf/core.py b/cratedb_toolkit/wtf/core.py new file mode 100644 index 00000000..a69fcb0b --- /dev/null +++ b/cratedb_toolkit/wtf/core.py @@ -0,0 +1,81 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import boltons.ecoutils + +from cratedb_toolkit.util.platform import PlatformInfo +from cratedb_toolkit.wtf.library import Library +from cratedb_toolkit.wtf.model import InfoContainerBase + + +class InfoContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + # General cluster health information. + Library.Health.cluster_name, + Library.Health.nodes_count, + Library.Health.nodes_list, + Library.Health.table_health, + Library.Health.backups_recent, + # Shard / node / partition allocation and rebalancing information. + Library.Shards.allocation, + Library.Shards.table_allocation, + Library.Shards.node_shard_distribution, + Library.Shards.table_shard_count, + Library.Shards.rebalancing_progress, + Library.Shards.rebalancing_status, + Library.Shards.not_started, + Library.Shards.not_started_count, + Library.Shards.max_checkpoint_delta, + Library.Shards.total_count, + Library.Shards.translog_uncommitted, + Library.Shards.translog_uncommitted_size, + ) + + def to_dict(self, data=None): + return super().to_dict(data={"system": self.system(), "database": self.database()}) + + def system(self): + data = {} + data["remark"] = ( + "This section includes system information about the machine running CrateDB " + 'Toolkit, effectively about the "compute" domain.' + ) + data["application"] = PlatformInfo.application() + data["eco"] = boltons.ecoutils.get_profile(scrub=self.scrub) + # data["libraries"] = PlatformInfo.libraries() # noqa: ERA001 + return data + + def database(self): + data = {} + data["remark"] = ( + "This section includes system and other diagnostics information about the CrateDB " + 'database cluster, effectively about the "storage" domain.' + ) + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + +class LogContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + Library.Logs.user_queries_latest, + ) + + +class JobInfoContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + Library.JobInfo.age_range, + Library.JobInfo.by_user, + Library.JobInfo.duration_buckets, + Library.JobInfo.duration_percentiles, + Library.JobInfo.history100, + Library.JobInfo.history_count, + Library.JobInfo.performance15min, + Library.JobInfo.running, + Library.JobInfo.running_count, + Library.JobInfo.top100_count, + Library.JobInfo.top100_duration_individual, + Library.JobInfo.top100_duration_total, + ) diff --git a/cratedb_toolkit/wtf/http.py b/cratedb_toolkit/wtf/http.py new file mode 100644 index 00000000..1d70a4a7 --- /dev/null +++ b/cratedb_toolkit/wtf/http.py @@ -0,0 +1,41 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import typing as t +from functools import lru_cache + +from fastapi import Depends, FastAPI, HTTPException + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.service import start_service +from cratedb_toolkit.wtf.core import InfoContainer +from cratedb_toolkit.wtf.util import get_baseinfo + +logger = logging.getLogger(__name__) + +app = FastAPI() + + +@lru_cache +def database_adapter() -> DatabaseAdapter: + # TODO: return config.Settings() + cratedb_sqlalchemy_url = os.environ["CRATEDB_SQLALCHEMY_URL"] + return DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + + +@app.get("/") +def read_root(): + return get_baseinfo() + + +@app.get("/info/{category}") +def info(category: str, adapter: t.Annotated[DatabaseAdapter, Depends(database_adapter)], scrub: bool = False): # type: ignore[name-defined] + if category != "all": + raise HTTPException(status_code=404, detail="Info category not found") + sample = InfoContainer(adapter=adapter, scrub=scrub) + return sample.to_dict() + + +def start(listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + start_service(app="cratedb_toolkit.wtf.http:app") diff --git a/cratedb_toolkit/wtf/library.py b/cratedb_toolkit/wtf/library.py new file mode 100644 index 00000000..b2be3697 --- /dev/null +++ b/cratedb_toolkit/wtf/library.py @@ -0,0 +1,601 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +from cratedb_toolkit.wtf.model import InfoElement, LogElement +from cratedb_toolkit.wtf.util import get_single_value + + +class Library: + """ + A collection of SQL queries and utilities suitable for diagnostics on CrateDB. + + Credits to the many authors and contributors of CrateDB diagnostics utilities, + dashboards, and cheat sheets. + + Acknowledgements: Baurzhan Sakhariev, Eduardo Legatti, Georg Traar, Hernan + Cianfagna, Ivan Sanchez Valencia, Karyn Silva de Azevedo, Niklas Schmidtmer, + Walter Behmann. + + References: + - https://community.cratedb.com/t/similar-elasticsearch-commands/1455/4 + - CrateDB Admin UI. + - CrateDB Grafana General Diagnostics Dashboard. + - Debugging CrateDB - Queries Cheat Sheet. + """ + + class Health: + """ + CrateDB health check queries. + """ + + backups_recent = InfoElement( + name="backups_recent", + label="Recent Backups", + sql=""" + SELECT repository, name, finished, state + FROM sys.snapshots + ORDER BY finished DESC + LIMIT 10; + """, + description="Most recent 10 backups", + ) + + cluster_name = InfoElement( + name="cluster_name", + label="Cluster name", + sql=r"SELECT name FROM sys.cluster;", + transform=get_single_value("name"), + ) + + nodes_count = InfoElement( + name="cluster_nodes_count", + label="Total number of cluster nodes", + sql=r"SELECT COUNT(*) AS count FROM sys.nodes;", + transform=get_single_value("count"), + ) + nodes_list = InfoElement( + name="cluster_nodes_list", + label="Cluster Nodes", + sql="SELECT * FROM sys.nodes ORDER BY hostname;", + description="Telemetry information for all cluster nodes.", + ) + table_health = InfoElement( + name="table_health", + label="Table Health", + sql="SELECT health, COUNT(*) AS table_count FROM sys.health GROUP BY health;", + description="Table health short summary", + ) + + class JobInfo: + """ + Information distilled from `sys.jobs_log` and `sys.jobs`. + """ + + age_range = InfoElement( + name="age_range", + label="Query age range", + description="Timestamps of first and last job", + sql=""" + SELECT + MIN(started) AS "first_job", + MAX(started) AS "last_job" + FROM sys.jobs_log; + """, + ) + by_user = InfoElement( + name="by_user", + label="Queries by user", + sql=r""" + SELECT + username, + COUNT(username) AS count + FROM sys.jobs_log + GROUP BY username + ORDER BY count DESC; + """, + description="Total number of queries per user.", + ) + + duration_buckets = InfoElement( + name="duration_buckets", + label="Query Duration Distribution (Buckets)", + sql=""" + WITH dur AS ( + SELECT + ended-started::LONG AS duration + FROM sys.jobs_log + ), + pct AS ( + SELECT + [0.25,0.5,0.75,0.99,0.999,1] pct_in, + percentile(duration,[0.25,0.5,0.75,0.99,0.999,1]) as pct, + count(*) cnt + FROM dur + ) + SELECT + UNNEST(pct_in) * 100 AS bucket, + cnt - CEIL(UNNEST(pct_in) * cnt) AS count, + CEIL(UNNEST(pct)) duration + ---cnt + FROM pct; + """, + description="Distribution of query durations, bucketed.", + ) + duration_percentiles = InfoElement( + name="duration_percentiles", + label="Query Duration Distribution (Percentiles)", + sql=""" + SELECT + min(ended-started::LONG) AS min, + percentile(ended-started::LONG, 0.50) AS p50, + percentile(ended-started::LONG, 0.90) AS p90, + percentile(ended-started::LONG, 0.99) AS p99, + MAX(ended-started::LONG) AS max + FROM + sys.jobs_log + LIMIT 50; + """, + description="Distribution of query durations, percentiles.", + ) + history100 = InfoElement( + name="history", + label="Query History", + sql=""" + SELECT + started AS "time", + stmt, + (ended::LONG - started::LONG) AS duration, + username + FROM sys.jobs_log + WHERE stmt NOT ILIKE '%snapshot%' + ORDER BY time DESC + LIMIT 100; + """, + transform=lambda x: list(reversed(x)), + description="Statements and durations of the 100 recent queries / jobs.", + ) + history_count = InfoElement( + name="history_count", + label="Query History Count", + sql=""" + SELECT + COUNT(*) AS job_count + FROM + sys.jobs_log; + """, + transform=get_single_value("job_count"), + description="Total number of queries on this node.", + ) + performance15min = InfoElement( + name="performance15min", + label="Query performance 15min", + sql=r""" + SELECT + CURRENT_TIMESTAMP AS last_timestamp, + (ended / 10000) * 10000 + 5000 AS ended_time, + COUNT(*) / 10.0 AS qps, + TRUNC(AVG(ended::BIGINT - started::BIGINT), 2) AS duration, + UPPER(regexp_matches(stmt,'^\s*(\w+).*')[1]) AS query_type + FROM + sys.jobs_log + WHERE + ended > now() - ('15 minutes')::INTERVAL + GROUP BY 1, 2, 5 + ORDER BY ended_time ASC; + """, + description="The query performance within the last 15 minutes, including two metrics: " + "queries per second, and query speed (ms).", + ) + running = InfoElement( + name="running", + label="Currently Running Queries", + sql=""" + SELECT + started AS "time", + stmt, + (CURRENT_TIMESTAMP::LONG - started::LONG) AS duration, + username + FROM sys.jobs + WHERE stmt NOT ILIKE '%snapshot%' + ORDER BY time; + """, + description="Statements and durations of currently running queries / jobs.", + ) + running_count = InfoElement( + name="running_count", + label="Number of running queries", + sql=""" + SELECT + COUNT(*) AS job_count + FROM + sys.jobs; + """, + transform=get_single_value("job_count"), + description="Total number of currently running queries.", + ) + top100_count = InfoElement( + name="top100_count", + label="Query frequency", + description="The 100 most frequent queries.", + sql=""" + SELECT + stmt, + COUNT(stmt) AS stmt_count, + MAX((ended::LONG - started::LONG) ) AS max_duration, + MIN((ended::LONG - started::LONG) ) AS min_duration, + AVG((ended::LONG - started::LONG) ) AS avg_duration, + PERCENTILE((ended::LONG - started::LONG), 0.99) AS p90 + FROM sys.jobs_log + GROUP BY stmt + ORDER BY stmt_count DESC + LIMIT 100; + """, + ) + top100_duration_individual = InfoElement( + name="top100_duration_individual", + label="Individual Query Duration", + description="The 100 queries by individual duration.", + sql=""" + SELECT + (ended::LONG - started::LONG) AS duration, + stmt + FROM sys.jobs_log + ORDER BY duration DESC + LIMIT 100; + """, + unit="ms", + ) + top100_duration_total = InfoElement( + name="top100_duration_total", + label="Total Query Duration", + description="The 100 queries by total duration.", + sql=""" + SELECT + SUM(ended::LONG - started::LONG) AS total_duration, + stmt, + COUNT(stmt) AS stmt_count + FROM sys.jobs_log + GROUP BY stmt + ORDER BY total_duration DESC + LIMIT 100; + """, + unit="ms", + ) + + class Logs: + """ + Access `sys.jobs_log` for logging purposes. + """ + + """ + TODO: Implement `tail` in one way or another. + -- https://stackoverflow.com/q/4714975 + + @seut says: + why? whats the issue with sorting it desc by ended? As the table will be computed by results of + all nodes inside the cluster, the natural ordering might not be deterministic. + + Ideas:: + + SELECT * FROM sys.jobs_log OFFSET -10; + SELECT * FROM sys.jobs_log OFFSET (SELECT count(*) FROM sys.jobs_log)-10; + + - https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#to-char-expression-format-string + - https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#date-format-format-string-timezone-timestamp + """ + + user_queries_latest = LogElement( + name="user_queries_latest", + label="Latest User Queries", + sql=r""" + SELECT + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', started) AS started, + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', ended) AS ended, + classification, stmt, username, node + FROM + sys.jobs_log + WHERE + stmt NOT LIKE '%sys.%' AND + stmt NOT LIKE '%information_schema.%' + ORDER BY ended DESC + LIMIT {limit}; + """, + ) + + class Replication: + """ + Information about logical replication. + """ + + # https://github.com/crate/crate/blob/master/docs/admin/logical-replication.rst#monitoring + subscriptions = """ + SELECT s.subname, s.subpublications, sr.srrelid::text, sr.srsubstate, sr.srsubstate_reason + FROM pg_subscription s + JOIN pg_subscription_rel sr ON s.oid = sr.srsubid + ORDER BY s.subname; + """ + + class Resources: + """ + About system resources. + """ + + # TODO: Needs templating. + column_cardinality = """ + SELECT tablename, attname, n_distinct + FROM pg_stats + WHERE schemaname = '...' + AND tablename IN (...) + AND attname IN (...); + """ + + file_descriptors = """ + SELECT + name AS node_name, + process['open_file_descriptors'] AS "open_file_descriptors", + process['max_open_file_descriptors'] AS max_open_file_descriptors + FROM sys.nodes + ORDER BY node_name; + """ + + heap_usage = """ + SELECT + name AS node_name, + heap['used'] / heap['max']::DOUBLE AS heap_used + FROM sys.nodes + ORDER BY node_name; + """ + + tcp_connections = """ + SELECT + name AS node_name, + connections + FROM sys.nodes + ORDER BY node_name; + """ + + # TODO: Q: Why "14"? Is it about only getting information about the `write` thread pool? + # A: Yes, the `write` thread pool will be exposed as the last entry inside this array. + # But this may change in future. + thread_pools = """ + SELECT + name AS node_name, + thread_pools[14]['queue'], + thread_pools[14]['active'], + thread_pools[14]['threads'] + FROM sys.nodes + ORDER BY node_name; + """ + + class Settings: + """ + Reflect cluster settings. + """ + + info = """ + SELECT + name, + master_node, + settings['cluster']['routing']['allocation']['cluster_concurrent_rebalance'] + AS cluster_concurrent_rebalance, + settings['indices']['recovery']['max_bytes_per_sec'] AS max_bytes_per_sec + FROM sys.cluster + LIMIT 1; + """ + + class Shards: + """ + Information about shard / node / table / partition allocation and rebalancing. + """ + + # https://cratedb.com/docs/crate/reference/en/latest/admin/system-information.html#example + # TODO: Needs templating. + for_table = """ + SELECT + schema_name, + table_name, + id, + partition_ident, + num_docs, + primary, + relocating_node, + routing_state, + state, + orphan_partition + FROM sys.shards + WHERE schema_name = '{schema_name}' AND table_name = '{table_name}'; + """ + + # Identify the location of the shards for each partition. + # TODO: Needs templating. + location_for_partition = """ + SELECT table_partitions.table_schema, + table_partitions.table_name, + table_partitions.values[{partition_column}]::TIMESTAMP, + shards.primary, + shards.node['name'] + FROM sys.shards + JOIN information_schema.table_partitions ON shards.partition_ident=table_partitions.partition_ident + WHERE table_partitions.table_name = {table_name} + ORDER BY 1,2,3,4,5; + """ + + allocation = InfoElement( + name="shard_allocation", + sql=""" + SELECT + IF(s.primary = TRUE, 'primary', 'replica') AS shard_type, + COALESCE(shards, 0) AS shards + FROM + UNNEST([true, false]) s(primary) + LEFT JOIN ( + SELECT primary, COUNT(*) AS shards + FROM sys.allocations + WHERE current_state != 'STARTED' + GROUP BY 1 + ) a ON s.primary = a.primary; + """, + label="Shard Allocation", + description="Support identifying issues with shard allocation.", + ) + + max_checkpoint_delta = InfoElement( + name="max_checkpoint_delta", + sql=""" + SELECT + COALESCE(MAX(seq_no_stats['local_checkpoint'] - seq_no_stats['global_checkpoint']), 0) + AS max_checkpoint_delta + FROM sys.shards; + """, + transform=get_single_value("max_checkpoint_delta"), + label="Delta between local and global checkpoint", + description="If the delta between the local and global checkpoint is significantly large, " + "shard replication might have stalled or slowed down.", + ) + + # data-hot-2 262 + # data-hot-1 146 + node_shard_distribution = InfoElement( + name="node_shard_distribution", + label="Shard Distribution", + sql=""" + SELECT + node['name'] AS node_name, + COUNT(*) AS num_shards + FROM sys.shards + WHERE primary = true + GROUP BY node_name; + """, + description="Shard distribution across nodes.", + ) + + not_started = InfoElement( + name="shard_not_started", + label="Shards not started", + sql=""" + SELECT * + FROM sys.allocations + WHERE current_state != 'STARTED'; + """, + description="Information about shards which have not been started.", + ) + not_started_count = InfoElement( + name="shard_not_started_count", + label="Number of shards not started", + description="Total number of shards which have not been started.", + sql=""" + SELECT COUNT(*) AS not_started_count + FROM sys.allocations + WHERE current_state != 'STARTED'; + """, + transform=get_single_value("not_started_count"), + ) + + rebalancing_progress = InfoElement( + name="shard_rebalancing_progress", + label="Shard Rebalancing Progress", + sql=""" + SELECT + table_name, + schema_name, + recovery['stage'] AS recovery_stage, + AVG(recovery['size']['percent']) AS progress, + COUNT(*) AS count + FROM + sys.shards + GROUP BY table_name, schema_name, recovery_stage; + """, + description="Information about rebalancing progress.", + ) + + rebalancing_status = InfoElement( + name="shard_rebalancing_status", + label="Shard Rebalancing Status", + sql=""" + SELECT node['name'], id, recovery['stage'], recovery['size']['percent'], routing_state, state + FROM sys.shards + WHERE routing_state IN ('INITIALIZING', 'RELOCATING') + ORDER BY id; + """, + description="Information about rebalancing activities.", + ) + + table_allocation = InfoElement( + name="table_allocation", + label="Table Allocations", + sql=""" + SELECT + table_schema, table_name, node_id, shard_id, partition_ident, current_state, decisions, explanation + FROM + sys.allocations; + """, + description="Table allocation across nodes, shards, and partitions.", + ) + + table_allocation_special = InfoElement( + name="table_allocation_special", + label="Table Allocations Special", + sql=""" + SELECT decisions[2]['node_name'] AS node_name, COUNT(*) AS table_count + FROM sys.allocations + GROUP BY decisions[2]['node_name']; + """, + description="Table allocation. Special.", + ) + + table_shard_count = InfoElement( + name="table_shard_count", + label="Table Shard Count", + sql=""" + SELECT + table_schema, + table_name, + SUM(number_of_shards) AS num_shards + FROM + information_schema.table_partitions + WHERE + closed = false + GROUP BY table_schema, table_name; + """, + description="Total number of shards per table.", + ) + + total_count = InfoElement( + name="shard_total_count", + label="Number of shards", + description="Total number of shards.", + sql=""" + SELECT COUNT(*) AS shard_count + FROM sys.shards + """, + transform=get_single_value("shard_count"), + ) + + # TODO: Are both `translog_uncommitted` items sensible? + translog_uncommitted = InfoElement( + name="translog_uncommitted", + label="Uncommitted Translog", + description="Check if translogs are committed properly by comparing the " + "`flush_threshold_size` with the `uncommitted_size` of a shard.", + sql=""" + SELECT + sh.table_name, + sh.partition_ident, + SUM(sh.translog_stats['uncommitted_size']) / POWER(1024, 3) as "translog_uncomitted_in_gib" + FROM information_schema.table_partitions tp + JOIN sys.shards sh USING (table_name, partition_ident) + WHERE sh.translog_stats['uncommitted_size'] > settings['translog']['flush_threshold_size'] + GROUP BY 1, 2 + ORDER BY 3 DESC; + """, + ) + translog_uncommitted_size = InfoElement( + name="translog_uncommitted_size", + label="Total uncommitted translog size", + description="A large number of uncommitted translog operations can indicate issues with shard replication.", + sql=""" + SELECT COALESCE(SUM(translog_stats['uncommitted_size']), 0) AS translog_uncommitted_size + FROM sys.shards; + """, + transform=get_single_value("translog_uncommitted_size"), + unit="bytes", + ) diff --git a/cratedb_toolkit/wtf/model.py b/cratedb_toolkit/wtf/model.py new file mode 100644 index 00000000..27739adc --- /dev/null +++ b/cratedb_toolkit/wtf/model.py @@ -0,0 +1,86 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import dataclasses +import typing as t +from abc import abstractmethod + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.wtf.util import get_baseinfo + + +@dataclasses.dataclass +class InfoElement: + name: str + label: str + sql: str + description: t.Union[str, None] = None + transform: t.Union[t.Callable, None] = None + unit: t.Union[str, None] = None + + def to_dict(self): + data = dataclasses.asdict(self) + data["sql"] = data["sql"].strip() + data["transform"] = str(data["transform"]) + return data + + +@dataclasses.dataclass +class LogElement(InfoElement): + limit: int = 100 + + +@dataclasses.dataclass +class ElementStore: + items: t.List[InfoElement] = dataclasses.field(default_factory=list) + index: t.Dict[str, InfoElement] = dataclasses.field(default_factory=dict) + + def add(self, *elements: InfoElement): + for element in elements: + self.items.append(element) + if element.name in self.index: + raise KeyError(f"Duplicate key/label: {element.name}") + self.index[element.name] = element + + +class InfoContainerBase: + def __init__(self, adapter: DatabaseAdapter, scrub: bool = False): + self.adapter = adapter + self.scrub = scrub + self.elements = ElementStore() + self.register_builtins() + + @abstractmethod + def register_builtins(self): + raise NotImplementedError("Method needs to be implemented by child class") + + def metadata(self): + data = {} + data.update(get_baseinfo()) + data["elements"] = {} + for element in self.elements.items: + data["elements"][element.name] = element.to_dict() + return data + + def evaluate_element(self, element: InfoElement): + sql = element.sql + if isinstance(element, LogElement): + sql = sql.format(limit=element.limit) + results = self.adapter.run_sql(sql, records=True) + if element.transform is not None: + results = element.transform(results) + return results + + def to_dict(self, data=None): + if data is None: + data = self.render() + return {"meta": self.metadata(), "data": data} + + def render(self): + data = {} + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + # FIXME + def by_table(self, schema: str, table: str): + raise NotImplementedError("Please implement InfoContainerBase.by_table") diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py new file mode 100644 index 00000000..b630c977 --- /dev/null +++ b/cratedb_toolkit/wtf/query_collector.py @@ -0,0 +1,247 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. + +# ruff: noqa: S608 +import json +import logging +import os +import time +from uuid import uuid4 + +import urllib3 +from crate import client + +logger = logging.getLogger(__name__) + +host = os.getenv("HOSTNAME", "localhost:4200") +username = os.getenv("USERNAME", "crate") +password = os.getenv("PASSWORD", "") +interval = float(os.getenv("INTERVAL", 10)) +stmt_log_table = os.getenv("STMT_TABLE", "stats.statement_log") +last_exec_table = os.getenv("LAST_EXEC_TABLE", "stats.last_execution") +last_execution_ts = 0 +sys_jobs_log = {} +bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000] +bucket_dict = { + "10": 0, + "50": 0, + "100": 0, + "500": 0, + "1000": 0, + "2000": 0, + "5000": 0, + "10000": 0, + "15000": 0, + "20000": 0, + "INF": 0, +} + + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +conn = client.connect(host, username=username, password=password) +cursor = conn.cursor() +last_scrape = int(time.time() * 1000) - (interval * 60000) + +TRACING = False + + +def init(): + stmt = ( + f"CREATE TABLE IF NOT EXISTS {stmt_log_table} " + f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, " + f"username TEXT, query_type TEXT, avg_duration FLOAT, nodes ARRAY(TEXT))" + ) + cursor.execute(stmt) + stmt = f"SELECT id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used FROM {stmt_log_table}" + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + stmt = f"CREATE TABLE IF NOT EXISTS {last_exec_table} (last_execution TIMESTAMP)" + cursor.execute(stmt) + stmt = f"SELECT last_execution FROM {last_exec_table}" + cursor.execute(stmt) + init_last_execution(cursor.fetchall()) + + +def init_last_execution(last_execution): + global last_execution_ts + if len(last_execution) == 0: + last_execution_ts = 0 + stmt = f"INSERT INTO {last_exec_table} (last_execution) VALUES (?)" + cursor.execute(stmt, (0,)) + else: + last_execution_ts = last_execution[0][0] + + +def init_stmts(stmts): + for stmt in stmts: + stmt_id = stmt[0] + stmt_column = stmt[1] + calls = stmt[2] + bucket = stmt[3] + user = stmt[4] + stmt_type = stmt[5] + avg_duration = stmt[6] + nodes = stmt[7] + last_used = stmt[8] + + if stmt_column not in sys_jobs_log: + sys_jobs_log[stmt_column] = { + "id": stmt_id, + "size": 0, + "info": [], + "calls": calls, + "bucket": bucket, + "user": user, + "type": stmt_type, + "avg_duration": avg_duration, + "nodes": nodes, + "last_used": last_used, + "in_db": True, + "changed": False, + } + + +def write_stats_to_db(): + logger.info("Writing statistics to database") + write_query_stmt = ( + f"INSERT INTO {stmt_log_table} " + f"(id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used) " + f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + update_query_stmt = ( + f"UPDATE {stmt_log_table} " + f"SET calls = ?, avg_duration = ?, nodes = ?, bucket = ?, last_used = ? " + f"WHERE id = ?" + ) + write_params = [] + for key in sys_jobs_log.keys(): + if not sys_jobs_log[key]["in_db"]: + write_params.append( + [ + sys_jobs_log[key]["id"], + key, + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["user"], + sys_jobs_log[key]["type"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["last_used"], + ] + ) + sys_jobs_log[key]["in_db"] = True + sys_jobs_log[key]["changed"] = False + elif sys_jobs_log[key]["changed"]: + cursor.execute( + update_query_stmt, + ( + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["last_used"], + sys_jobs_log[key]["id"], + ), + ) + sys_jobs_log[key]["changed"] = False + if len(write_params) > 0: + cursor.executemany(write_query_stmt, write_params) + + stmt = f"UPDATE {last_exec_table} SET last_execution = ?" + cursor.execute(stmt, (last_scrape,)) + + +def read_stats(): + stmt = ( + f"SELECT id, stmt, calls, avg_duration, bucket, username, query_type, nodes, last_used " + f"FROM {stmt_log_table} ORDER BY calls DESC, avg_duration DESC;" + ) + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + return sys_jobs_log + + +def assign_to_bucket(bucket, duration): + found = False + for element in bucket_list: + if duration < element: + found = True + bucket[str(element)] += 1 + break + if not found: + bucket["INF"] += 1 + + return bucket + + +def update_statistics(query_results): + global sys_jobs_log + for result in query_results: + started = result[0] + ended = result[1] + classification = result[2] + stmt = result[3] + user = result[4] + node = json.dumps(result[5]) + + duration = ended - started + if stmt not in sys_jobs_log: + sys_jobs_log[stmt] = { + "id": str(uuid4()), + "calls": 0, + "bucket": dict(bucket_dict), + "user": user, + "type": classification["type"], + "avg_duration": duration, + "in_db": False, + "last_used": started, + "nodes": [], + "changed": True, + } + sys_jobs_log[stmt]["changed"] = True + sys_jobs_log[stmt]["avg_duration"] = (sys_jobs_log[stmt]["avg_duration"] + duration) / 2 + sys_jobs_log[stmt]["bucket"] = assign_to_bucket(sys_jobs_log[stmt]["bucket"], duration) + sys_jobs_log[stmt]["last_used"] = started + sys_jobs_log[stmt]["calls"] += 1 + sys_jobs_log[stmt]["nodes"].append(node) + sys_jobs_log[stmt]["nodes"] = list(set(sys_jobs_log[stmt]["nodes"])) # only save unique nodes + if TRACING: + logger.info(f"Updated statistics: {sys_jobs_log}") + + +def scrape_db(): + global last_scrape + logger.info("Reading sys.jobs_log") + next_scrape = int(time.time() * 1000) + stmt = ( + f"SELECT " + f"started, ended, classification, stmt, username, node " + f"FROM sys.jobs_log " + f"WHERE " + f"stmt NOT LIKE '%sys.%' AND " + f"stmt NOT LIKE '%information_schema.%' " + f"AND ended BETWEEN {last_scrape} AND {next_scrape} " + f"ORDER BY ended DESC" + ) + + cursor.execute(stmt) + result = cursor.fetchall() + update_statistics(result) + last_scrape = next_scrape + + +def run(): + scrape_db() + write_stats_to_db() + + +def main(): + init() + while True: + run() + logger.info(f"Sleeping for {interval} seconds") + time.sleep(interval) + + +if __name__ == "__main__": + main() diff --git a/cratedb_toolkit/wtf/util.py b/cratedb_toolkit/wtf/util.py new file mode 100644 index 00000000..983863f2 --- /dev/null +++ b/cratedb_toolkit/wtf/util.py @@ -0,0 +1,21 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import datetime as dt +import functools as ft +import typing as t + +from boltons.iterutils import get_path + +from cratedb_toolkit import __appname__, __version__ + + +def get_baseinfo(): + data: t.Dict[str, t.Union[str, dt.datetime]] = {} + data["system_time"] = dt.datetime.now() + data["application_name"] = __appname__ + data["application_version"] = __version__ + return data + + +def get_single_value(column_name: str): + return ft.partial(get_path, path=(0, column_name)) diff --git a/pyproject.toml b/pyproject.toml index 0dd3042b..cc2727b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,12 +90,16 @@ dependencies = [ "colorama<1", "colorlog", "crash", - 'importlib-metadata; python_version <= "3.7"', + "fastapi<0.105", + 'importlib-metadata; python_version < "3.8"', + 'importlib-resources; python_version < "3.9"', + "polars<0.21", "python-dotenv<2", "python-slugify<9", "sqlalchemy-cratedb>=0.36.1,<1", "sqlparse<0.6", 'typing-extensions<5; python_version <= "3.7"', + "uvicorn<0.25", ] [project.optional-dependencies] all = [ @@ -167,6 +171,7 @@ repository = "https://github.com/crate-workbench/cratedb-toolkit" [project.scripts] cratedb-retention = "cratedb_toolkit.retention.cli:cli" cratedb-toolkit = "cratedb_toolkit.cli:cli" +cratedb-wtf = "cratedb_toolkit.wtf.cli:cli" ctk = "cratedb_toolkit.cli:cli" migr8 = "cratedb_toolkit.io.mongodb.cli:main" [project.entry-points.pytest11] diff --git a/tests/retention/test_cli.py b/tests/retention/test_cli.py index b100e50a..01b7487c 100644 --- a/tests/retention/test_cli.py +++ b/tests/retention/test_cli.py @@ -61,7 +61,7 @@ def test_setup_verbose(caplog, cratedb, settings): assert result.exit_code == 0 assert cratedb.database.table_exists(settings.policy_table.fullname) is True - assert 3 <= len(caplog.records) <= 10 + assert 3 <= len(caplog.records) <= 15 def test_setup_dryrun(caplog, cratedb, settings): diff --git a/tests/wtf/__init__.py b/tests/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/wtf/test_cli.py b/tests/wtf/test_cli.py new file mode 100644 index 00000000..b9620cd2 --- /dev/null +++ b/tests/wtf/test_cli.py @@ -0,0 +1,111 @@ +import json + +from boltons.iterutils import get_path +from click.testing import CliRunner + +from cratedb_toolkit.wtf.cli import cli + + +def test_wtf_cli_info(cratedb): + """ + Verify `cratedb-wtf info`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="info", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + system_keys = list(get_path(info, ("data", "system")).keys()) + database_keys = list(get_path(info, ("data", "database")).keys()) + assert system_keys == [ + "remark", + "application", + "eco", + # "libraries", + ] + assert "cluster_name" in database_keys + assert "cluster_nodes_count" in database_keys + + +def test_wtf_cli_logs(cratedb): + """ + Verify `cratedb-wtf logs`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="logs", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "user_queries_latest" in data_keys + assert len(info["data"]["user_queries_latest"]) > 3 + + +def test_wtf_cli_job_info(cratedb): + """ + Verify `cratedb-wtf job-info`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="job-info", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "by_user" in data_keys + assert "top100_count" in data_keys + assert "top100_duration_individual" in data_keys + assert "top100_duration_total" in data_keys + assert "performance15min" in data_keys + + +def test_wtf_cli_statistics_view(cratedb): + """ + Verify `cratedb-wtf job-statistics view`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="job-statistics view", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "stats" in data_keys From 55941a1af020c27bb6b12b1b0c89b95348fb1c2d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 2 Jan 2024 02:48:29 +0100 Subject: [PATCH 02/15] wtf: Add recorder for outcomes of `info` and `job-info` --- cratedb_toolkit/util/database.py | 7 ++-- cratedb_toolkit/wtf/README.md | 7 +++- cratedb_toolkit/wtf/cli.py | 13 +++++++- cratedb_toolkit/wtf/core.py | 6 +++- cratedb_toolkit/wtf/recorder.py | 57 ++++++++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 cratedb_toolkit/wtf/recorder.py diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index 2667bcc9..6693adf5 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, Crate.io Inc. +# Copyright (c) 2023-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import io import os @@ -28,10 +28,9 @@ class DatabaseAdapter: Wrap SQLAlchemy connection to database. """ - def __init__(self, dburi: str): + def __init__(self, dburi: str, echo: bool = False): self.dburi = dburi - # TODO: Make `echo=True` configurable. - self.engine = sa.create_engine(self.dburi, echo=False) + self.engine = sa.create_engine(self.dburi, echo=echo) self.connection = self.engine.connect() def run_sql(self, sql: t.Union[str, Path, io.IOBase], records: bool = False, ignore: str = None): diff --git a/cratedb_toolkit/wtf/README.md b/cratedb_toolkit/wtf/README.md index 9f8c5777..92ba946f 100644 --- a/cratedb_toolkit/wtf/README.md +++ b/cratedb_toolkit/wtf/README.md @@ -26,12 +26,17 @@ Display database cluster log messages. cratedb-wtf logs ``` -Statistics. +Collect and display job statistics. ```shell cratedb-wtf job-statistics collect cratedb-wtf job-statistics view ``` +Record complete outcomes of `info` and `job-info`. +```shell +cratedb-wtf record +``` + ## HTTP API diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py index 5b2761db..45c45cbc 100644 --- a/cratedb_toolkit/wtf/cli.py +++ b/cratedb_toolkit/wtf/cli.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, Crate.io Inc. +# Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging import os @@ -16,6 +16,7 @@ ) from cratedb_toolkit.util.data import jd from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer, LogContainer +from cratedb_toolkit.wtf.recorder import InfoRecorder logger = logging.getLogger(__name__) @@ -178,6 +179,16 @@ def job_statistics_view(ctx: click.Context): jd(response) +@make_command(cli, "record", "Record `info` and `job-info` outcomes.") +@click.pass_context +def record(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url, echo=False) + recorder = InfoRecorder(adapter=adapter, scrub=scrub) + recorder.record_forever() + + @make_command(cli, "serve", help_serve) @click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address") @click.option("--reload", is_flag=True, help="Dynamically reload changed files") diff --git a/cratedb_toolkit/wtf/core.py b/cratedb_toolkit/wtf/core.py index a69fcb0b..77a7a784 100644 --- a/cratedb_toolkit/wtf/core.py +++ b/cratedb_toolkit/wtf/core.py @@ -1,5 +1,7 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. +import typing as t + import boltons.ecoutils from cratedb_toolkit.util.platform import PlatformInfo @@ -35,13 +37,15 @@ def to_dict(self, data=None): return super().to_dict(data={"system": self.system(), "database": self.database()}) def system(self): - data = {} + data: t.Dict[str, t.Any] = {} data["remark"] = ( "This section includes system information about the machine running CrateDB " 'Toolkit, effectively about the "compute" domain.' ) data["application"] = PlatformInfo.application() data["eco"] = boltons.ecoutils.get_profile(scrub=self.scrub) + # `version_info` is a list of mixed data types: [3, 11, 6, "final", 0]. + data["eco"]["python"]["version_info"] = str(data["eco"]["python"]["version_info"]) # data["libraries"] = PlatformInfo.libraries() # noqa: ERA001 return data diff --git a/cratedb_toolkit/wtf/recorder.py b/cratedb_toolkit/wtf/recorder.py new file mode 100644 index 00000000..3c90d477 --- /dev/null +++ b/cratedb_toolkit/wtf/recorder.py @@ -0,0 +1,57 @@ +# Copyright (c) 2023-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import threading +import time + +import sqlalchemy as sa + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer + +logger = logging.getLogger(__name__) + + +class InfoRecorder: + """ + Record complete outcomes of `info` and `job-info`. + """ + + clusterinfo_table = "ext.clusterinfo" + jobinfo_table = "ext.jobinfo" + interval_seconds = 10 + + def __init__(self, adapter: DatabaseAdapter, scrub: bool = False): + self.adapter = adapter + self.scrub = scrub + + def record_once(self): + logger.info("Recording information snapshot") + clusterinfo_sample = InfoContainer(adapter=self.adapter, scrub=self.scrub) + jobinfo_sample = JobInfoContainer(adapter=self.adapter, scrub=self.scrub) + + for table, sample in ((self.clusterinfo_table, clusterinfo_sample), (self.jobinfo_table, jobinfo_sample)): + self.adapter.connection.execute( + sa.text( + f""" + CREATE TABLE IF NOT EXISTS {table} + (time TIMESTAMP DEFAULT NOW(), info OBJECT) + """ + ) + ) + self.adapter.connection.execute( + sa.text(f"INSERT INTO {table} (info) VALUES (:info)"), {"info": sample.to_dict()["data"]} # noqa: S608 + ) + + def record_forever(self): + logger.info(f"Starting to record information snapshot each {self.interval_seconds} seconds") + thread = threading.Thread(target=self.do_record_forever) + thread.start() + + def do_record_forever(self): + while True: + try: + self.record_once() + except Exception: + logger.exception("Failed to record information snapshot") + time.sleep(self.interval_seconds) From 74c2f52bcce550df7ef1277d496ff05505da839c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 16 Apr 2024 19:57:52 +0200 Subject: [PATCH 03/15] Chore: Format code using most recent ruff and mypy --- cratedb_toolkit/retention/strategy/delete.py | 1 + cratedb_toolkit/retention/strategy/reallocate.py | 1 + cratedb_toolkit/retention/strategy/snapshot.py | 1 + cratedb_toolkit/wtf/recorder.py | 3 ++- examples/cloud_import.py | 1 + examples/retention_edit.py | 1 + examples/retention_retire_cutoff.py | 1 + pyproject.toml | 1 + 8 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cratedb_toolkit/retention/strategy/delete.py b/cratedb_toolkit/retention/strategy/delete.py index f096666e..46e86eac 100644 --- a/cratedb_toolkit/retention/strategy/delete.py +++ b/cratedb_toolkit/retention/strategy/delete.py @@ -10,6 +10,7 @@ In CrateDB, tables for storing retention policies need to be created once manually. See the file setup/schema.sql in this repository. """ + import dataclasses import logging diff --git a/cratedb_toolkit/retention/strategy/reallocate.py b/cratedb_toolkit/retention/strategy/reallocate.py index fdd7c5ee..a8976a89 100644 --- a/cratedb_toolkit/retention/strategy/reallocate.py +++ b/cratedb_toolkit/retention/strategy/reallocate.py @@ -12,6 +12,7 @@ - Tables for storing retention policies need to be created once manually in CrateDB. See the file setup/schema.sql in this repository. """ + import dataclasses import logging diff --git a/cratedb_toolkit/retention/strategy/snapshot.py b/cratedb_toolkit/retention/strategy/snapshot.py index 8626eed9..c538860c 100644 --- a/cratedb_toolkit/retention/strategy/snapshot.py +++ b/cratedb_toolkit/retention/strategy/snapshot.py @@ -10,6 +10,7 @@ In CrateDB, tables for storing retention policies need to be created once manually. See the file setup/schema.sql in this repository. """ + import dataclasses import logging diff --git a/cratedb_toolkit/wtf/recorder.py b/cratedb_toolkit/wtf/recorder.py index 3c90d477..08cf5756 100644 --- a/cratedb_toolkit/wtf/recorder.py +++ b/cratedb_toolkit/wtf/recorder.py @@ -40,7 +40,8 @@ def record_once(self): ) ) self.adapter.connection.execute( - sa.text(f"INSERT INTO {table} (info) VALUES (:info)"), {"info": sample.to_dict()["data"]} # noqa: S608 + sa.text(f"INSERT INTO {table} (info) VALUES (:info)"), # noqa: S608 + {"info": sample.to_dict()["data"]}, ) def record_forever(self): diff --git a/examples/cloud_import.py b/examples/cloud_import.py index 62df562b..45c6326a 100644 --- a/examples/cloud_import.py +++ b/examples/cloud_import.py @@ -32,6 +32,7 @@ python examples/cloud_import.py e1e38d92-a650-48f1-8a70-8133f2d5c400 """ + import logging import os import sys diff --git a/examples/retention_edit.py b/examples/retention_edit.py index d25d1630..bae7f267 100644 --- a/examples/retention_edit.py +++ b/examples/retention_edit.py @@ -27,6 +27,7 @@ python examples/retention_edit.py crate://localhost:4200 """ + import logging import os diff --git a/examples/retention_retire_cutoff.py b/examples/retention_retire_cutoff.py index 310b9539..f093636a 100644 --- a/examples/retention_retire_cutoff.py +++ b/examples/retention_retire_cutoff.py @@ -33,6 +33,7 @@ python examples/retention_retire_cutoff.py crate://localhost:4200 """ + import logging import os diff --git a/pyproject.toml b/pyproject.toml index cc2727b8..877d4efe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -308,6 +308,7 @@ format = [ { cmd = "black ." }, # Configure Ruff not to auto-fix (remove!): # unused imports (F401), unused variables (F841), `print` statements (T201), and commented-out code (ERA001). + { cmd = "ruff format" }, { cmd = "ruff check --fix --ignore=ERA --ignore=F401 --ignore=F841 --ignore=T20 --ignore=ERA001 ." }, { cmd = "pyproject-fmt --keep-full-version pyproject.toml" }, ] From 8431baba4c75fa8cd281397297c4bf6bb212a850 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 16 Apr 2024 07:01:27 +0200 Subject: [PATCH 04/15] cfr: Add `ctk cfr` diagnostics program Add basic implementation for `sys-export` and `sys-import` subcommands. It is about exporting system tables of CrateDB into SQL DDL and JSONL files, and re-importing them for later analysis. --- .gitignore | 1 + CHANGES.md | 2 +- cratedb_toolkit/cfr/README.md | 61 ++++++++ cratedb_toolkit/cfr/__init__.py | 0 cratedb_toolkit/cfr/backlog.md | 15 ++ cratedb_toolkit/cfr/cli.py | 67 +++++++++ cratedb_toolkit/cfr/systable.py | 222 ++++++++++++++++++++++++++++ cratedb_toolkit/cli.py | 2 + cratedb_toolkit/sqlalchemy/patch.py | 40 +++++ cratedb_toolkit/util/cli.py | 23 +++ cratedb_toolkit/util/data.py | 9 ++ pyproject.toml | 1 + tests/cfr/__init__.py | 0 tests/cfr/test_cli.py | 40 +++++ 14 files changed, 482 insertions(+), 1 deletion(-) create mode 100644 cratedb_toolkit/cfr/README.md create mode 100644 cratedb_toolkit/cfr/__init__.py create mode 100644 cratedb_toolkit/cfr/backlog.md create mode 100644 cratedb_toolkit/cfr/cli.py create mode 100644 cratedb_toolkit/cfr/systable.py create mode 100644 tests/cfr/__init__.py create mode 100644 tests/cfr/test_cli.py diff --git a/.gitignore b/.gitignore index c272cda7..e06bc42b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ __pycache__ dist .coverage* coverage.xml +/cfr /foo /tmp /DOWNLOAD diff --git a/CHANGES.md b/CHANGES.md index f261bc05..4e96bb8c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,7 @@ ## Unreleased -- Add `cratedb-wtf` diagnostics program +- Add `ctk cfr` and `ctk wtf` diagnostics programs ## 2024/06/11 v0.0.13 - Dependencies: Migrate from `crate[sqlalchemy]` to `sqlalchemy-cratedb` diff --git a/cratedb_toolkit/cfr/README.md b/cratedb_toolkit/cfr/README.md new file mode 100644 index 00000000..87a3cd6d --- /dev/null +++ b/cratedb_toolkit/cfr/README.md @@ -0,0 +1,61 @@ +# CrateDB Cluster Flight Recorder (CFR) + +Collect required cluster information for support requests +and self-service debugging. + + +## Synopsis + +Define CrateDB database cluster address. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +``` + +Export system table information into timestamped file, +by default into the `cfr/sys` directory. +```shell +ctk cfr sys-export +``` + + +## Usage + +Export system table information into given directory. +```shell +ctk cfr sys-export file:///var/ctk/cfr/sys +``` + +Import system table information from given directory. +```shell +ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37 +``` + +In order to define the CrateDB database address on the +command line, use a command like this. +```shell +ctk cfr --cratedb-sqlalchemy-url=crate://localhost/ sys-export +``` + + +## OCI + +If you don't want or can't install the program, you can also use its OCI +container image, for example on Docker, Postman, or Kubernetes. + +Optionally, start a CrateDB single-node instance for testing purposes. +```shell +docker run --rm -it \ + --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=4g \ + crate/crate:nightly -Cdiscovery.type=single-node +``` + +Define the database URI address, and an alias to the `cfr` program. +```shell +echo "CRATEDB_SQLALCHEMY_URL=crate://localhost/" > .env +alias cfr="docker run --rm -it --network=host --volume=$(PWD)/cfr:/cfr --env-file=.env ghcr.io/crate-workbench/cratedb-toolkit:latest ctk cfr" +``` + +Verify everything works. +```shell +cfr --help +``` diff --git a/cratedb_toolkit/cfr/__init__.py b/cratedb_toolkit/cfr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/cfr/backlog.md b/cratedb_toolkit/cfr/backlog.md new file mode 100644 index 00000000..75fb78a7 --- /dev/null +++ b/cratedb_toolkit/cfr/backlog.md @@ -0,0 +1,15 @@ +# CFR Backlog + +## Iteration +1 +- sys-export: Does the program need capabilities to **LIMIT** cardinality + on `sys-export` operations, for example, when they are super large? +- sys-import: Accept target database schema. +- Combine with `ctk wtf info` +- Converge output into tar archive + +## Iteration +2 +- Cluster name muss in `cfr//sys/`, für multi-tenancy operations. + +## Iteration +3 +- Wie komme ich ans `crate.yaml`? +- Wie komme ich an die Logfiles? `docker log`? diff --git a/cratedb_toolkit/cfr/cli.py b/cratedb_toolkit/cfr/cli.py new file mode 100644 index 00000000..18256211 --- /dev/null +++ b/cratedb_toolkit/cfr/cli.py @@ -0,0 +1,67 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import sys + +import click +from click_aliases import ClickAliasedGroup + +from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter +from cratedb_toolkit.util.cli import ( + boot_click, + error_logger, + make_command, +) +from cratedb_toolkit.util.data import jd, path_from_url + +logger = logging.getLogger(__name__) + + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@cratedb_sqlalchemy_option +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") +@click.version_option() +@click.pass_context +def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): + """ + Diagnostics and informational utilities. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + return boot_click(ctx, verbose, debug) + + +@make_command(cli, "sys-export") +@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys") +@click.pass_context +def sys_export(ctx: click.Context, target: str): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + try: + stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target)) + path = stc.save() + jd({"path": str(path)}) + except Exception as ex: + error_logger(ctx)(ex) + sys.exit(1) + + +@make_command(cli, "sys-import") +@click.argument("source", envvar="CFR_SOURCE", type=str, required=True) +@click.pass_context +def sys_import(ctx: click.Context, source: str): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + try: + stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source)) + stc.load() + except Exception as ex: + error_logger(ctx)(ex) + sys.exit(1) diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py new file mode 100644 index 00000000..89cb12bc --- /dev/null +++ b/cratedb_toolkit/cfr/systable.py @@ -0,0 +1,222 @@ +""" +CrateDB Diagnostics: System Tables Exporter and Importer. + +Schemas and results of following queries should be included: +```sql +SELECT * FROM sys.cluster +SELECT * FROM sys.nodes +SELECT * FROM sys.shards +SELECT * FROM sys.allocations +SELECT * FROM sys.jobs_log +SELECT * FROM sys.operations_log +``` + +https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html +https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string +""" + +import datetime as dt +import logging +import typing as t +from pathlib import Path + +import polars as pl +import sqlalchemy as sa +from tqdm import tqdm + +from cratedb_toolkit.sqlalchemy.patch import patch_encoder +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import error_logger + +logger = logging.getLogger(__name__) + + +DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"] + + +class SystemTableKnowledge: + """ + Manage a few bits of knowledge about CrateDB internals. + """ + + # Name of CrateDB's schema for system tables. + SYS_SCHEMA = "sys" + + # TODO: Reflecting the `summits` table raises an error. + # AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec' + REFLECTION_BLOCKLIST = ["summits"] + + +class ExportSettings: + """ + Manage a few bits of knowledge about how to export system tables from CrateDB. + """ + + # Subdirectories where to store schema vs. data information. + SCHEMA_PATH = "schema" + DATA_PATH = "data" + + # The filename prefix when storing tables to disk. + TABLE_FILENAME_PREFIX = "sys-" + + +class SystemTableInspector: + """ + Reflect schema information from CrateDB system tables. + """ + + def __init__(self, dburi: str): + self.dburi = dburi + self.adapter = DatabaseAdapter(dburi=self.dburi) + self.engine = self.adapter.engine + self.inspector = sa.inspect(self.engine) + + def table_names(self): + return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA) + + def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str: + meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA) + table = sa.Table(tablename_in, meta, autoload_with=self.engine) + table.schema = out_schema + table.name = tablename_out + sql = "" + if with_drop_table: + sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n" + sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n" + return sql + + +class SystemTableExporter: + """ + Export schema and data from CrateDB system tables. + """ + + def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"): + self.dburi = dburi + self.target = target + self.data_format = data_format + self.adapter = DatabaseAdapter(dburi=self.dburi) + self.engine = self.adapter.engine + self.inspector = SystemTableInspector(dburi=self.dburi) + self.target.mkdir(exist_ok=True, parents=True) + + def read_table(self, tablename: str) -> pl.DataFrame: + sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608 + # logger.info(f"Running SQL: {sql}") # noqa: ERA001 + return pl.read_database( + query=sql, # noqa: S608 + connection=self.engine, + ) + + def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): + if self.data_format == "csv": + # polars.exceptions.ComputeError: CSV format does not support nested data + # return df.write_csv() # noqa: ERA001 + return frame.to_pandas().to_csv(file) + elif self.data_format in ["jsonl", "ndjson"]: + return frame.write_ndjson(file and file.buffer) # type: ignore[arg-type] + elif self.data_format in ["parquet", "pq"]: + return frame.write_parquet(file and file.buffer) # type: ignore[arg-type] + else: + raise NotImplementedError(f"Output format not implemented: {self.data_format}") + + def save(self) -> Path: + timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + path = self.target / timestamp + logger.info(f"Exporting system tables to: {path}") + system_tables = self.inspector.table_names() + path_schema = path / ExportSettings.SCHEMA_PATH + path_data = path / ExportSettings.DATA_PATH + path_schema.mkdir(parents=True, exist_ok=True) + path_data.mkdir(parents=True, exist_ok=True) + table_count = 0 + for tablename in tqdm(system_tables, disable=None): + if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST: + continue + + table_count += 1 + + path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" + path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" + tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" + + # Write schema file. + with open(path_table_schema, "w") as fh_schema: + print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema) + + # Write data file. + df = self.read_table(tablename=tablename) + if df.is_empty(): + continue + mode = "w" + if self.data_format in ["parquet", "pq"]: + mode = "wb" + with open(path_table_data, mode) as fh_data: + self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data)) + + logger.info(f"Successfully exported {table_count} system tables") + return path + + +class SystemTableImporter: + """ + Import schema and data about CrateDB system tables. + """ + + def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False): + self.dburi = dburi + self.source = source + self.data_format = data_format + self.debug = debug + self.adapter = DatabaseAdapter(dburi=self.dburi) + + def table_names(self): + path_schema = self.source / ExportSettings.SCHEMA_PATH + names = [] + for item in path_schema.glob("*.sql"): + name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "") + names.append(name) + return names + + def load(self): + path_schema = self.source / ExportSettings.SCHEMA_PATH + path_data = self.source / ExportSettings.DATA_PATH + + if not path_schema.exists(): + raise FileNotFoundError(f"Path does not exist: {path_schema}") + + logger.info(f"Importing system tables from: {self.source}") + + for tablename in tqdm(self.table_names()): + tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename + + path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" + path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" + + # Skip import of non-existing or empty files. + if not path_table_data.exists() or path_table_data.stat().st_size == 0: + continue + + # Invoke SQL DDL. + schema_sql = path_table_schema.read_text() + self.adapter.run_sql(schema_sql) + + # Load data. + try: + df: pl.DataFrame = self.load_table(path_table_data) + df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append") + except Exception as ex: + error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") + + # df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 + + def load_table(self, path: Path) -> pl.DataFrame: + if path.suffix in [".jsonl"]: + return pl.read_ndjson(path) + elif path.suffix in [".parquet", ".pq"]: + return pl.read_parquet(path) + else: + raise NotImplementedError(f"Input format not implemented: {path.suffix}") + + +patch_encoder() diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 9315a701..661b7631 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -3,6 +3,7 @@ from cratedb_toolkit.util.cli import boot_click +from .cfr.cli import cli as cfr_cli from .cluster.cli import cli as cloud_cli from .io.cli import cli as io_cli from .job.cli import cli_list_jobs @@ -19,6 +20,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): return boot_click(ctx, verbose, debug) +cli.add_command(cfr_cli, name="cfr") cli.add_command(cloud_cli, name="cluster") cli.add_command(io_cli, name="load") cli.add_command(shell_cli, name="shell") diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py index ec9e7d44..c188a5a1 100644 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -1,5 +1,11 @@ +import calendar +import datetime as dt +import json import typing as t +from decimal import Decimal +from uuid import UUID +import numpy as np import sqlalchemy as sa @@ -35,3 +41,37 @@ def get_table_names(self, connection: sa.Connection, schema: t.Optional[str] = N return get_table_names_dist(self, connection=connection, schema=schema, **kw) dialect.get_table_names = get_table_names # type: ignore + + +def patch_encoder(): + import crate.client.http + + crate.client.http.CrateJsonEncoder = CrateJsonEncoderWithNumPy + + +class CrateJsonEncoderWithNumPy(json.JSONEncoder): + epoch_aware = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc) + epoch_naive = dt.datetime(1970, 1, 1) + + def default(self, o): + # Vanilla CrateDB Python. + if isinstance(o, (Decimal, UUID)): + return str(o) + if isinstance(o, dt.datetime): + if o.tzinfo is not None: + delta = o - self.epoch_aware + else: + delta = o - self.epoch_naive + return int(delta.microseconds / 1000.0 + (delta.seconds + delta.days * 24 * 3600) * 1000.0) + if isinstance(o, dt.date): + return calendar.timegm(o.timetuple()) * 1000 + + # NumPy ndarray and friends. + # https://stackoverflow.com/a/49677241 + if isinstance(o, np.integer): + return int(o) + elif isinstance(o, np.floating): + return float(o) + elif isinstance(o, np.ndarray): + return o.tolist() + return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index f5956dea..f5ebc3ea 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -108,3 +108,26 @@ def decorator(f): return f return decorator + + +def error_level_by_debug(debug: bool): + if debug: + return logger.exception + else: + return logger.error + + +def running_with_debug(ctx: click.Context) -> bool: + return ( + (ctx.parent and ctx.parent.params.get("debug", False)) + or (ctx.parent and ctx.parent.parent and ctx.parent.parent.params.get("debug", False)) + or False + ) + + +def error_logger(about: t.Union[click.Context, bool]) -> t.Callable: + if isinstance(about, click.Context): + return error_level_by_debug(running_with_debug(about)) + if isinstance(about, bool): + return error_level_by_debug(about) + raise TypeError(f"Unknown type for argument: {about}") diff --git a/cratedb_toolkit/util/data.py b/cratedb_toolkit/util/data.py index f8efb1bc..396d7670 100644 --- a/cratedb_toolkit/util/data.py +++ b/cratedb_toolkit/util/data.py @@ -2,6 +2,9 @@ import json import sys import typing as t +from pathlib import Path + +from yarl import URL def jd(data: t.Any): @@ -43,3 +46,9 @@ def default(self, o): return o.isoformat() return json.JSONEncoder.default(self, o) + + +def path_from_url(url: str): + url_obj = URL(url) + path = Path((url_obj.host or "") + (url_obj.path or "")) + return path.absolute() diff --git a/pyproject.toml b/pyproject.toml index 877d4efe..29292f4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,6 +98,7 @@ dependencies = [ "python-slugify<9", "sqlalchemy-cratedb>=0.36.1,<1", "sqlparse<0.6", + "tqdm<5", 'typing-extensions<5; python_version <= "3.7"', "uvicorn<0.25", ] diff --git a/tests/cfr/__init__.py b/tests/cfr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py new file mode 100644 index 00000000..fc60705d --- /dev/null +++ b/tests/cfr/test_cli.py @@ -0,0 +1,40 @@ +import json +import re +from pathlib import Path + +from click.testing import CliRunner + +from cratedb_toolkit.cfr.cli import cli + + +def filenames(path: Path): + return sorted([item.name for item in path.iterdir()]) + + +def test_cfr_cli_export(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-export` works. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + result = runner.invoke( + cli, + args="--debug sys-export", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify log output. + assert "Exporting system tables to" in caplog.text + assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing" + + # Verify outcome. + path = Path(json.loads(result.output)["path"]) + assert filenames(path) == ["data", "schema"] + + schema_files = filenames(path / "schema") + data_files = filenames(path / "schema") + + assert len(schema_files) >= 19 + assert len(data_files) >= 19 From 83db52c49504d09f89a2b1d30f506a027fb1457c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 18 Apr 2024 02:07:43 +0200 Subject: [PATCH 05/15] cfr: Use cluster name within filesystem path --- CHANGES.md | 1 + cratedb_toolkit/cfr/README.md | 48 ++++++++++++++++++++++++--------- cratedb_toolkit/cfr/cli.py | 2 +- cratedb_toolkit/cfr/systable.py | 18 ++++++++----- cratedb_toolkit/wtf/core.py | 5 ++++ pyproject.toml | 3 +-- 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4e96bb8c..f89e3d95 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Add `ctk cfr` and `ctk wtf` diagnostics programs +- Remove support for Python 3.7 ## 2024/06/11 v0.0.13 - Dependencies: Migrate from `crate[sqlalchemy]` to `sqlalchemy-cratedb` diff --git a/cratedb_toolkit/cfr/README.md b/cratedb_toolkit/cfr/README.md index 87a3cd6d..6e7320ca 100644 --- a/cratedb_toolkit/cfr/README.md +++ b/cratedb_toolkit/cfr/README.md @@ -1,37 +1,56 @@ # CrateDB Cluster Flight Recorder (CFR) -Collect required cluster information for support requests +CFR helps collecting information about CrateDB clusters for support requests and self-service debugging. ## Synopsis -Define CrateDB database cluster address. +Define CrateDB database cluster address using the `CRATEDB_SQLALCHEMY_URL` +environment variable. ```shell export CRATEDB_SQLALCHEMY_URL=crate://localhost/ ``` -Export system table information into timestamped file, -by default into the `cfr/sys` directory. +Export system table information into timestamped file, by default into the +current working directory, into a directory using the pattern +`cfr/{clustername}/{timestamp}/sys` directory. ```shell ctk cfr sys-export ``` +Import system table information from given directory. +```shell +ctk cfr sys-import file://./cfr/crate/2024-04-18T01-13-41/sys +``` + + +## Configuration + +### Target and source directories -## Usage +The target directory on the export operation, and the source directory on the +import operation, can be specified using a single positional argument on the +command line. Export system table information into given directory. ```shell -ctk cfr sys-export file:///var/ctk/cfr/sys +ctk cfr sys-export file:///var/ctk/cfr ``` Import system table information from given directory. ```shell -ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37 +ctk cfr sys-import file:///var/ctk/cfr/crate/2024-04-18T01-13-41/sys ``` -In order to define the CrateDB database address on the -command line, use a command like this. +Alternatively, you can use the `CFR_TARGET` and `CFR_SOURCE` environment +variables. + +### CrateDB database address + +The CrateDB database address can be defined on the command line, using the +`--cratedb-sqlalchemy-url` option, or by using the `CRATEDB_SQLALCHEMY_URL` +environment variable. ```shell ctk cfr --cratedb-sqlalchemy-url=crate://localhost/ sys-export ``` @@ -40,7 +59,7 @@ ctk cfr --cratedb-sqlalchemy-url=crate://localhost/ sys-export ## OCI If you don't want or can't install the program, you can also use its OCI -container image, for example on Docker, Postman, or Kubernetes. +container image, for example on Docker, Postman, Kubernetes, and friends. Optionally, start a CrateDB single-node instance for testing purposes. ```shell @@ -55,7 +74,12 @@ echo "CRATEDB_SQLALCHEMY_URL=crate://localhost/" > .env alias cfr="docker run --rm -it --network=host --volume=$(PWD)/cfr:/cfr --env-file=.env ghcr.io/crate-workbench/cratedb-toolkit:latest ctk cfr" ``` -Verify everything works. +Export system table information. +```shell +cfr sys-export +``` + +Import system table information. ```shell -cfr --help +cfr sys-import cfr/crate/2024-04-18T01-13-41/sys ``` diff --git a/cratedb_toolkit/cfr/cli.py b/cratedb_toolkit/cfr/cli.py index 18256211..4aab7450 100644 --- a/cratedb_toolkit/cfr/cli.py +++ b/cratedb_toolkit/cfr/cli.py @@ -41,7 +41,7 @@ def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: b @make_command(cli, "sys-export") -@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys") +@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr") @click.pass_context def sys_export(ctx: click.Context, target: str): cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py index 89cb12bc..18b0962e 100644 --- a/cratedb_toolkit/cfr/systable.py +++ b/cratedb_toolkit/cfr/systable.py @@ -27,6 +27,7 @@ from cratedb_toolkit.sqlalchemy.patch import patch_encoder from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.cli import error_logger +from cratedb_toolkit.wtf.core import InfoContainer logger = logging.getLogger(__name__) @@ -96,16 +97,16 @@ def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = self.target = target self.data_format = data_format self.adapter = DatabaseAdapter(dburi=self.dburi) - self.engine = self.adapter.engine + self.info = InfoContainer(adapter=self.adapter) self.inspector = SystemTableInspector(dburi=self.dburi) self.target.mkdir(exist_ok=True, parents=True) def read_table(self, tablename: str) -> pl.DataFrame: sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608 - # logger.info(f"Running SQL: {sql}") # noqa: ERA001 + logger.debug(f"Running SQL: {sql}") return pl.read_database( query=sql, # noqa: S608 - connection=self.engine, + connection=self.adapter.engine, ) def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): @@ -122,7 +123,7 @@ def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): def save(self) -> Path: timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") - path = self.target / timestamp + path = self.target / self.info.cluster_name / timestamp / "sys" logger.info(f"Exporting system tables to: {path}") system_tables = self.inspector.table_names() path_schema = path / ExportSettings.SCHEMA_PATH @@ -134,8 +135,6 @@ def save(self) -> Path: if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST: continue - table_count += 1 - path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" @@ -148,6 +147,9 @@ def save(self) -> Path: df = self.read_table(tablename=tablename) if df.is_empty(): continue + + table_count += 1 + mode = "w" if self.data_format in ["parquet", "pq"]: mode = "wb" @@ -187,6 +189,7 @@ def load(self): logger.info(f"Importing system tables from: {self.source}") + table_count = 0 for tablename in tqdm(self.table_names()): tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename @@ -197,6 +200,8 @@ def load(self): if not path_table_data.exists() or path_table_data.stat().st_size == 0: continue + table_count += 1 + # Invoke SQL DDL. schema_sql = path_table_schema.read_text() self.adapter.run_sql(schema_sql) @@ -208,6 +213,7 @@ def load(self): except Exception as ex: error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") + logger.info(f"Successfully imported {table_count} system tables") # df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 def load_table(self, path: Path) -> pl.DataFrame: diff --git a/cratedb_toolkit/wtf/core.py b/cratedb_toolkit/wtf/core.py index 77a7a784..b05ebe7f 100644 --- a/cratedb_toolkit/wtf/core.py +++ b/cratedb_toolkit/wtf/core.py @@ -1,6 +1,7 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import typing as t +from functools import cached_property import boltons.ecoutils @@ -33,6 +34,10 @@ def register_builtins(self): Library.Shards.translog_uncommitted_size, ) + @cached_property + def cluster_name(self): + return self.evaluate_element(Library.Health.cluster_name) + def to_dict(self, data=None): return super().to_dict(data={"system": self.system(), "database": self.database()}) diff --git a/pyproject.toml b/pyproject.toml index 29292f4d..bb973efc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ authors = [ { name = "Marija Selakovic", email = "marija@crate.io" }, { name = "Andreas Motl", email = "andreas.motl@crate.io" }, ] -requires-python = ">=3.7" +requires-python = ">=3.8" classifiers = [ "Development Status :: 3 - Alpha", "Environment :: Console", @@ -46,7 +46,6 @@ classifiers = [ "Operating System :: Unix", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From eadbd80da2fa0e04801c10fe2a9d727e2ae59554 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 18 Apr 2024 13:52:55 +0200 Subject: [PATCH 06/15] common: Improve table name quoting --- .../testing/testcontainers/cratedb.py | 3 +- cratedb_toolkit/util/database.py | 38 ++++++++++++++----- pyproject.toml | 3 +- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/cratedb_toolkit/testing/testcontainers/cratedb.py b/cratedb_toolkit/testing/testcontainers/cratedb.py index 183bac03..3b7433e1 100644 --- a/cratedb_toolkit/testing/testcontainers/cratedb.py +++ b/cratedb_toolkit/testing/testcontainers/cratedb.py @@ -24,6 +24,7 @@ from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer, asbool from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.database import quote_table_name logger = logging.getLogger(__name__) @@ -188,7 +189,7 @@ def reset(self, tables: Optional[list] = None): """ if tables and self.database: for reset_table in tables: - self.database.connection.exec_driver_sql(f"DROP TABLE IF EXISTS {reset_table};") + self.database.connection.exec_driver_sql(f"DROP TABLE IF EXISTS {quote_table_name(reset_table)};") def get_connection_url(self, *args, **kwargs): """ diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index 6693adf5..1a40061d 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -78,11 +78,11 @@ def run_sql_real(self, sql: str, records: bool = False): else: return results - def count_records(self, tablename_full: str, errors: Literal["raise", "ignore"] = "raise"): + def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise"): """ Return number of records in table. """ - sql = f"SELECT COUNT(*) AS count FROM {tablename_full};" # noqa: S608 + sql = f"SELECT COUNT(*) AS count FROM {quote_table_name(name)};" # noqa: S608 try: results = self.run_sql(sql=sql) except ProgrammingError as ex: @@ -92,30 +92,30 @@ def count_records(self, tablename_full: str, errors: Literal["raise", "ignore"] return 0 return results[0][0] - def table_exists(self, tablename_full: str) -> bool: + def table_exists(self, name: str) -> bool: """ Check whether given table exists. """ - sql = f"SELECT 1 FROM {tablename_full} LIMIT 1;" # noqa: S608 + sql = f"SELECT 1 FROM {quote_table_name(name)} LIMIT 1;" # noqa: S608 try: self.run_sql(sql=sql) return True except Exception: return False - def refresh_table(self, tablename_full: str): + def refresh_table(self, name: str): """ Run a `REFRESH TABLE ...` command. """ - sql = f"REFRESH TABLE {tablename_full};" # noqa: S608 + sql = f"REFRESH TABLE {quote_table_name(name)};" # noqa: S608 self.run_sql(sql=sql) return True - def prune_table(self, tablename_full: str, errors: Literal["raise", "ignore"] = "raise"): + def prune_table(self, name: str, errors: Literal["raise", "ignore"] = "raise"): """ Run a `DELETE FROM ...` command. """ - sql = f"DELETE FROM {tablename_full};" # noqa: S608 + sql = f"DELETE FROM {quote_table_name(name)};" # noqa: S608 try: self.run_sql(sql=sql) except ProgrammingError as ex: @@ -125,11 +125,11 @@ def prune_table(self, tablename_full: str, errors: Literal["raise", "ignore"] = return False return True - def drop_table(self, tablename_full: str): + def drop_table(self, name: str): """ Run a `DROP TABLE ...` command. """ - sql = f"DROP TABLE IF EXISTS {tablename_full};" # noqa: S608 + sql = f"DROP TABLE IF EXISTS {quote_table_name(name)};" # noqa: S608 self.run_sql(sql=sql) return True @@ -332,3 +332,21 @@ def decode_database_table(url: str) -> t.Tuple[str, str]: if url_.scheme == "crate" and not database: database = url_.query_params.get("schema") return database, table + + +def quote_table_name(name: str) -> str: + """ + Quote table name if not happened already. + + In: foo + Out: "foo" + + In: "foo" + Out: "foo" + + In: foo.bar + Out: foo.bar + """ + if '"' not in name and "." not in name: + name = f'"{name}"' + return name diff --git a/pyproject.toml b/pyproject.toml index bb973efc..29292f4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ authors = [ { name = "Marija Selakovic", email = "marija@crate.io" }, { name = "Andreas Motl", email = "andreas.motl@crate.io" }, ] -requires-python = ">=3.8" +requires-python = ">=3.7" classifiers = [ "Development Status :: 3 - Alpha", "Environment :: Console", @@ -46,6 +46,7 @@ classifiers = [ "Operating System :: Unix", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From 2a20d058ef950e2043911887aac8786643246474 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 18 Apr 2024 14:39:44 +0200 Subject: [PATCH 07/15] cfr: Add software tests for `sys-import` --- cratedb_toolkit/cfr/backlog.md | 16 ++++-- cratedb_toolkit/cfr/systable.py | 2 + cratedb_toolkit/wtf/backlog.md | 2 + doc/backlog.md | 12 +++++ tests/cfr/assets/sys-operations.jsonl | 1 + tests/cfr/assets/sys-operations.sql | 8 +++ tests/cfr/test_cli.py | 70 +++++++++++++++++++++++++++ 7 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 tests/cfr/assets/sys-operations.jsonl create mode 100644 tests/cfr/assets/sys-operations.sql diff --git a/cratedb_toolkit/cfr/backlog.md b/cratedb_toolkit/cfr/backlog.md index 75fb78a7..c71f517a 100644 --- a/cratedb_toolkit/cfr/backlog.md +++ b/cratedb_toolkit/cfr/backlog.md @@ -1,15 +1,21 @@ # CFR Backlog ## Iteration +1 -- sys-export: Does the program need capabilities to **LIMIT** cardinality - on `sys-export` operations, for example, when they are super large? -- sys-import: Accept target database schema. -- Combine with `ctk wtf info` +- Software tests - Converge output into tar archive +- Combine with `ctk wtf info` + - On sys-export, add it to the CFR package + - After sys-import, use it to access the imported data ## Iteration +2 -- Cluster name muss in `cfr//sys/`, für multi-tenancy operations. +- sys-export: Does the program need capabilities to **LIMIT** cardinality + on `sys-export` operations, for example, when they are super large? +- sys-import: Accept target database schema. ## Iteration +3 - Wie komme ich ans `crate.yaml`? - Wie komme ich an die Logfiles? `docker log`? +- Use OpenTelemetry traces in one way or another? + +## Done +- Cluster name muss in `cfr//sys/`, für multi-tenancy operations. diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py index 18b0962e..9fb122a4 100644 --- a/cratedb_toolkit/cfr/systable.py +++ b/cratedb_toolkit/cfr/systable.py @@ -107,6 +107,7 @@ def read_table(self, tablename: str) -> pl.DataFrame: return pl.read_database( query=sql, # noqa: S608 connection=self.adapter.engine, + infer_schema_length=1000, ) def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): @@ -132,6 +133,7 @@ def save(self) -> Path: path_data.mkdir(parents=True, exist_ok=True) table_count = 0 for tablename in tqdm(system_tables, disable=None): + logger.debug(f"Exporting table: {tablename}") if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST: continue diff --git a/cratedb_toolkit/wtf/backlog.md b/cratedb_toolkit/wtf/backlog.md index cd6fc7e2..9f0c82e2 100644 --- a/cratedb_toolkit/wtf/backlog.md +++ b/cratedb_toolkit/wtf/backlog.md @@ -10,6 +10,8 @@ - High-level analysis, evaluating a set of threshold rules - High-level summary reports with heuristics support - Network diagnostics? +- Provide a GUI? + https://github.com/davep/pispy ## Iteration +3 - Make it work with CrateDB Cloud. diff --git a/doc/backlog.md b/doc/backlog.md index 4bfaead6..1a25a5d9 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -32,6 +32,18 @@ - Store `CRATEDB_CLOUD_CLUSTER_ID` into `cratedb_toolkit.constants` - Cloud Tests: Verify file uploads - Docs: Add examples in more languages: Java, JavaScript, Lua, PHP +- Docs: + - https://pypi.org/project/red-panda/ + - https://pypi.org/project/redpanda/ + https://github.com/amancevice/redpanda + - https://pypi.org/project/alyeska/ +- Kafka: + - https://github.com/bakdata/streams-bootstrap + - https://pypi.org/project/kashpy/ +- CFR/WTF + - https://github.com/peekjef72/sql_exporter +- Migrate / I/O adapter + - https://community.cratedb.com/t/migrating-from-postgresql-or-timescale-to-cratedb/620 ## Iteration +2.5 - Retention: Improve retention subsystem CLI API. diff --git a/tests/cfr/assets/sys-operations.jsonl b/tests/cfr/assets/sys-operations.jsonl new file mode 100644 index 00000000..f220dbcf --- /dev/null +++ b/tests/cfr/assets/sys-operations.jsonl @@ -0,0 +1 @@ +{"id":"0","job_id":"3cd98282-50f6-c25d-c69e-90eeea6d7afc","name":"collect","node":{"name":"Testa del Rutor","id":"sy7vpr9mSzS4RMwKJTxWkA"},"started":1713399434586,"used_bytes":0} diff --git a/tests/cfr/assets/sys-operations.sql b/tests/cfr/assets/sys-operations.sql new file mode 100644 index 00000000..580624a9 --- /dev/null +++ b/tests/cfr/assets/sys-operations.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS "sys-operations" ( + id STRING, + job_id STRING, + name STRING, + node OBJECT, + started TIMESTAMP, + used_bytes LONG +); diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py index fc60705d..a282af29 100644 --- a/tests/cfr/test_cli.py +++ b/tests/cfr/test_cli.py @@ -1,5 +1,14 @@ import json import re +import shutil +import sys + +import tests + +if sys.version_info < (3, 9): + from importlib_resources import files +else: + from importlib.resources import files from pathlib import Path from click.testing import CliRunner @@ -38,3 +47,64 @@ def test_cfr_cli_export(cratedb, tmp_path, caplog): assert len(schema_files) >= 19 assert len(data_files) >= 19 + + +def test_cfr_cli_import(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-import` works. + """ + + # Blank database canvas. + imported_system_tables = [ + "sys-allocations", + "sys-checks", + "sys-cluster", + "sys-health", + "sys-jobs", + "sys-jobs_log", + "sys-jobs_metrics", + "sys-node_checks", + "sys-nodes", + "sys-operations", + "sys-operations_log", + "sys-privileges", + "sys-repositories", + "sys-roles", + "sys-segments", + "sys-shards", + "sys-snapshot_restore", + "sys-snapshots", + "sys-users", + ] + cratedb.reset(imported_system_tables) + + # Provision filesystem to look like a fake `sys-export` trace. + assets_path = files(tests.cfr) / "assets" + sys_operations_schema = assets_path / "sys-operations.sql" + sys_operations_data = assets_path / "sys-operations.jsonl" + schema_path = tmp_path / "schema" + data_path = tmp_path / "data" + schema_path.mkdir() + data_path.mkdir() + shutil.copy(sys_operations_schema, schema_path) + shutil.copy(sys_operations_data, data_path) + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_SOURCE": str(tmp_path)}) + result = runner.invoke( + cli, + args="--debug sys-import", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify log output. + assert "Importing system tables from" in caplog.text + assert re.search(r"Successfully imported \d+ system tables", caplog.text), "Log message missing" + + # Verify outcome. + results = cratedb.database.run_sql("SHOW TABLES", records=True) + assert results == [{"table_name": "sys-operations"}] + + cratedb.database.run_sql('REFRESH TABLE "sys-operations"') + assert cratedb.database.count_records("sys-operations") == 1 From e6ef4113e9e5aca5293542e0e1c4270c278544a0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 8 May 2024 11:37:51 +0200 Subject: [PATCH 08/15] Documentation: Slot in docs about CFR and WTF --- {cratedb_toolkit => doc}/cfr/backlog.md | 4 +++- .../cfr/README.md => doc/cfr/index.md | 16 +++++++++++++++- doc/index.md | 2 ++ {cratedb_toolkit => doc}/wtf/backlog.md | 7 ++++++- .../wtf/README.md => doc/wtf/index.md | 16 +++++++++++++++- 5 files changed, 41 insertions(+), 4 deletions(-) rename {cratedb_toolkit => doc}/cfr/backlog.md (78%) rename cratedb_toolkit/cfr/README.md => doc/cfr/index.md (91%) rename {cratedb_toolkit => doc}/wtf/backlog.md (71%) rename cratedb_toolkit/wtf/README.md => doc/wtf/index.md (82%) diff --git a/cratedb_toolkit/cfr/backlog.md b/doc/cfr/backlog.md similarity index 78% rename from cratedb_toolkit/cfr/backlog.md rename to doc/cfr/backlog.md index c71f517a..5d74bbfc 100644 --- a/cratedb_toolkit/cfr/backlog.md +++ b/doc/cfr/backlog.md @@ -1,4 +1,4 @@ -# CFR Backlog +# CrateDB CFR Backlog ## Iteration +1 - Software tests @@ -16,6 +16,8 @@ - Wie komme ich ans `crate.yaml`? - Wie komme ich an die Logfiles? `docker log`? - Use OpenTelemetry traces in one way or another? +- Possibly tap into profiling, using JFR, [profefe](https://github.com/profefe/profefe), + and/or [Grafana Pyroscope](https://github.com/grafana/pyroscope). ## Done - Cluster name muss in `cfr//sys/`, für multi-tenancy operations. diff --git a/cratedb_toolkit/cfr/README.md b/doc/cfr/index.md similarity index 91% rename from cratedb_toolkit/cfr/README.md rename to doc/cfr/index.md index 6e7320ca..489bd134 100644 --- a/cratedb_toolkit/cfr/README.md +++ b/doc/cfr/index.md @@ -1,8 +1,14 @@ +(cfr)= # CrateDB Cluster Flight Recorder (CFR) CFR helps collecting information about CrateDB clusters for support requests and self-service debugging. +## Install +```shell +pip install --upgrade cratedb-toolkit +``` +Alternatively, use the Docker image at `ghcr.io/crate-workbench/cratedb-toolkit`. ## Synopsis @@ -25,7 +31,7 @@ ctk cfr sys-import file://./cfr/crate/2024-04-18T01-13-41/sys ``` -## Configuration +## Usage ### Target and source directories @@ -83,3 +89,11 @@ Import system table information. ```shell cfr sys-import cfr/crate/2024-04-18T01-13-41/sys ``` + + +```{toctree} +:maxdepth: 1 +:hidden: + +backlog +``` diff --git a/doc/index.md b/doc/index.md index 55a7a7fc..f18e8d16 100644 --- a/doc/index.md +++ b/doc/index.md @@ -25,6 +25,8 @@ install datasets io/index retention +Cluster Flight Recorder (CFR) +Ad Hoc Diagnosis (WTF) ``` ```{toctree} diff --git a/cratedb_toolkit/wtf/backlog.md b/doc/wtf/backlog.md similarity index 71% rename from cratedb_toolkit/wtf/backlog.md rename to doc/wtf/backlog.md index 9f0c82e2..692c592c 100644 --- a/cratedb_toolkit/wtf/backlog.md +++ b/doc/wtf/backlog.md @@ -1,8 +1,13 @@ -# cratedb-wtf backlog +# CrateDB WTF Backlog ## Iteration +1 - Display differences to the standard configuration - `tail -f` for `sys.jobs_log` and friends + See [discussion and suggestions](https://github.com/crate-workbench/cratedb-toolkit/pull/88#pullrequestreview-1759838520). +- Check if the patch includes relevant details from here. + https://community.cratedb.com/t/monitoring-an-on-premises-cratedb-cluster-with-prometheus-and-grafana/1236 +- Inform about table sizes, like Admin UI is doing it. +- Inform about [shard imbalances](https://community.cratedb.com/t/cratedb-database-logs-showing-shard-is-now-inactive-and-threads-are-getting-blocked/1617/16). ## Iteration +2 - Make `cratedb-wtf logs` also optionally consider `sys.` tables. diff --git a/cratedb_toolkit/wtf/README.md b/doc/wtf/index.md similarity index 82% rename from cratedb_toolkit/wtf/README.md rename to doc/wtf/index.md index 92ba946f..3091a934 100644 --- a/cratedb_toolkit/wtf/README.md +++ b/doc/wtf/index.md @@ -1,8 +1,14 @@ -# cratedb-wtf +(wtf)= +# CrateDB WTF A diagnostics utility in the spirit of [git-wtf], [grafana-wtf], and [pip.wtf]. It is still a work-in-progress, but it is usable already. +## Install +```shell +pip install --upgrade cratedb-toolkit +``` +Alternatively, use the Docker image at `ghcr.io/crate-workbench/cratedb-toolkit`. ## Synopsis @@ -50,6 +56,14 @@ http http://127.0.0.1:4242/info/all ``` +```{toctree} +:maxdepth: 1 +:hidden: + +backlog +``` + + [git-wtf]: http://thrawn01.org/posts/2014/03/03/git-wtf/ [grafana-wtf]: https://github.com/panodata/grafana-wtf From 525a89ba6a6c281c2dab04cb1e645d4c39f287cd Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 4 Jun 2024 18:40:28 +0200 Subject: [PATCH 09/15] Chore: Add `build` and `doc/_build` folders to .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index e06bc42b..0b18349a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,9 +6,10 @@ .venv* __pycache__ dist +build .coverage* coverage.xml /cfr -/foo +/doc/_build /tmp /DOWNLOAD From 241bde26761ec089a9fecd8799e2ba8c7ff80ae8 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 4 Jun 2024 18:42:15 +0200 Subject: [PATCH 10/15] cfr: Fix software tests by correcting copy/paste error --- tests/cfr/test_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py index a282af29..4a6d95a8 100644 --- a/tests/cfr/test_cli.py +++ b/tests/cfr/test_cli.py @@ -43,10 +43,10 @@ def test_cfr_cli_export(cratedb, tmp_path, caplog): assert filenames(path) == ["data", "schema"] schema_files = filenames(path / "schema") - data_files = filenames(path / "schema") + data_files = filenames(path / "data") assert len(schema_files) >= 19 - assert len(data_files) >= 19 + assert len(data_files) >= 10 def test_cfr_cli_import(cratedb, tmp_path, caplog): From 9df11aadb8548a4c4bf20e657e7985cb89641e68 Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Fri, 24 May 2024 13:15:14 +0200 Subject: [PATCH 11/15] cfr: Add missing dependencies --- .github/workflows/main.yml | 2 +- doc/cfr/index.md | 2 +- doc/wtf/index.md | 7 ++++++- pyproject.toml | 16 ++++++++++++++-- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 730bc6fc..f2d5867e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -65,7 +65,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[cloud,datasets,io,test,develop] + pip install --use-pep517 --prefer-binary --editable=.[full,test,develop] - name: Run linter and software tests env: diff --git a/doc/cfr/index.md b/doc/cfr/index.md index 489bd134..3152a666 100644 --- a/doc/cfr/index.md +++ b/doc/cfr/index.md @@ -6,7 +6,7 @@ and self-service debugging. ## Install ```shell -pip install --upgrade cratedb-toolkit +pip install --upgrade 'cratedb-toolkit[cfr]' ``` Alternatively, use the Docker image at `ghcr.io/crate-workbench/cratedb-toolkit`. diff --git a/doc/wtf/index.md b/doc/wtf/index.md index 3091a934..6113ac3a 100644 --- a/doc/wtf/index.md +++ b/doc/wtf/index.md @@ -6,7 +6,7 @@ It is still a work-in-progress, but it is usable already. ## Install ```shell -pip install --upgrade cratedb-toolkit +pip install --upgrade 'cratedb-toolkit' ``` Alternatively, use the Docker image at `ghcr.io/crate-workbench/cratedb-toolkit`. @@ -46,6 +46,11 @@ cratedb-wtf record ## HTTP API +Install. +```shell +pip install --upgrade 'cratedb-toolkit[service]' +``` + Expose collected status information. ```shell cratedb-wtf --debug serve --reload diff --git a/pyproject.toml b/pyproject.toml index 29292f4d..b3d1d8a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,15 +96,20 @@ dependencies = [ "polars<0.21", "python-dotenv<2", "python-slugify<9", + "pyyaml<7", "sqlalchemy-cratedb>=0.36.1,<1", "sqlparse<0.6", "tqdm<5", 'typing-extensions<5; python_version <= "3.7"', - "uvicorn<0.25", + "yarl<1.10", ] [project.optional-dependencies] all = [ - "cratedb-toolkit[cloud,datasets,influxdb,io,mongodb]", + "cratedb-toolkit[full,influxdb,mongodb]", +] +cfr = [ + "pandas<3,>=1", + "pyarrow<17", ] cloud = [ "croud==1.11.1", @@ -130,6 +135,9 @@ docs = [ "sphinxcontrib-mermaid<1", "sphinxext-opengraph<1", ] +full = [ + "cratedb-toolkit[cfr,cloud,datasets,io,service]", +] influxdb = [ "cratedb-toolkit[io]", "influxio>=0.2.1,<1", @@ -150,6 +158,10 @@ release = [ "build<2", "twine<6", ] +service = [ + "fastapi<0.105", + "uvicorn<0.25", +] test = [ "cratedb-toolkit[testing]", "pueblo[dataframe]", From b591a8f563b4aed7b59047378b7796b27aec0c8b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 4 Jun 2024 23:39:45 +0200 Subject: [PATCH 12/15] wtf: Improve test cases --- cratedb_toolkit/sqlalchemy/patch.py | 21 ++++++--- cratedb_toolkit/wtf/cli.py | 17 +++++-- cratedb_toolkit/wtf/http.py | 2 +- cratedb_toolkit/wtf/query_collector.py | 15 ++++-- cratedb_toolkit/wtf/recorder.py | 1 + pyproject.toml | 2 + tests/conftest.py | 7 +++ tests/sqlalchemy/test_patch.py | 25 ++++++++++ tests/util/__init__.py | 0 tests/util/test_platform.py | 15 ++++++ tests/wtf/test_cli.py | 64 ++++++++++++++++++++++++++ tests/wtf/test_http.py | 43 +++++++++++++++++ 12 files changed, 195 insertions(+), 17 deletions(-) create mode 100644 tests/util/__init__.py create mode 100644 tests/util/test_platform.py create mode 100644 tests/wtf/test_http.py diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py index c188a5a1..838df632 100644 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -5,9 +5,15 @@ from decimal import Decimal from uuid import UUID -import numpy as np import sqlalchemy as sa +try: + import numpy as np + + has_numpy = True +except ImportError: + has_numpy = False + def patch_inspector(): """ @@ -68,10 +74,11 @@ def default(self, o): # NumPy ndarray and friends. # https://stackoverflow.com/a/49677241 - if isinstance(o, np.integer): - return int(o) - elif isinstance(o, np.floating): - return float(o) - elif isinstance(o, np.ndarray): - return o.tolist() + if has_numpy: + if isinstance(o, np.integer): + return int(o) + elif isinstance(o, np.floating): + return float(o) + elif isinstance(o, np.ndarray): + return o.tolist() return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py index 45c45cbc..54806432 100644 --- a/cratedb_toolkit/wtf/cli.py +++ b/cratedb_toolkit/wtf/cli.py @@ -144,8 +144,9 @@ def job_statistics(ctx: click.Context): @make_command(job_statistics, "collect", "Collect queries from sys.jobs_log.") +@click.option("--once", is_flag=True, default=False, required=False, help="Whether to record only one sample") @click.pass_context -def job_statistics_collect(ctx: click.Context): +def job_statistics_collect(ctx: click.Context, once: bool): """ Run jobs_log collector. @@ -153,7 +154,11 @@ def job_statistics_collect(ctx: click.Context): """ import cratedb_toolkit.wtf.query_collector - cratedb_toolkit.wtf.query_collector.main() + cratedb_toolkit.wtf.query_collector.init() + if once: + cratedb_toolkit.wtf.query_collector.record_once() + else: + cratedb_toolkit.wtf.query_collector.record_forever() @make_command(job_statistics, "view", "View job statistics about collected queries.") @@ -180,13 +185,17 @@ def job_statistics_view(ctx: click.Context): @make_command(cli, "record", "Record `info` and `job-info` outcomes.") +@click.option("--once", is_flag=True, default=False, required=False, help="Whether to record only one sample") @click.pass_context -def record(ctx: click.Context): +def record(ctx: click.Context, once: bool): cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] scrub = ctx.meta.get("scrub", False) adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url, echo=False) recorder = InfoRecorder(adapter=adapter, scrub=scrub) - recorder.record_forever() + if once: + recorder.record_once() + else: + recorder.record_forever() @make_command(cli, "serve", help_serve) diff --git a/cratedb_toolkit/wtf/http.py b/cratedb_toolkit/wtf/http.py index 1d70a4a7..f2a73011 100644 --- a/cratedb_toolkit/wtf/http.py +++ b/cratedb_toolkit/wtf/http.py @@ -2,9 +2,9 @@ # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging import os -import typing as t from functools import lru_cache +import typing_extensions as t from fastapi import Depends, FastAPI, HTTPException from cratedb_toolkit.util import DatabaseAdapter diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py index b630c977..d489f210 100644 --- a/cratedb_toolkit/wtf/query_collector.py +++ b/cratedb_toolkit/wtf/query_collector.py @@ -102,7 +102,7 @@ def init_stmts(stmts): def write_stats_to_db(): - logger.info("Writing statistics to database") + logger.info(f"Writing statistics to database table: {stmt_log_table}") write_query_stmt = ( f"INSERT INTO {stmt_log_table} " f"(id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used) " @@ -230,18 +230,23 @@ def scrape_db(): last_scrape = next_scrape -def run(): +def record_once(): + logger.info("Recording information snapshot") scrape_db() write_stats_to_db() -def main(): - init() +def record_forever(): while True: - run() + record_once() logger.info(f"Sleeping for {interval} seconds") time.sleep(interval) +def main(): + init() + record_forever() + + if __name__ == "__main__": main() diff --git a/cratedb_toolkit/wtf/recorder.py b/cratedb_toolkit/wtf/recorder.py index 08cf5756..fa7e7c2c 100644 --- a/cratedb_toolkit/wtf/recorder.py +++ b/cratedb_toolkit/wtf/recorder.py @@ -55,4 +55,5 @@ def do_record_forever(self): self.record_once() except Exception: logger.exception("Failed to record information snapshot") + logger.info(f"Sleeping for {self.interval_seconds} seconds") time.sleep(self.interval_seconds) diff --git a/pyproject.toml b/pyproject.toml index b3d1d8a0..35615795 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,6 +164,7 @@ service = [ ] test = [ "cratedb-toolkit[testing]", + "httpx<0.28", "pueblo[dataframe]", "pytest<9", "pytest-cov<6", @@ -288,6 +289,7 @@ extend-exclude = [ [tool.ruff.lint.per-file-ignores] "doc/conf.py" = ["A001", "ERA001"] "tests/*" = ["S101"] # Allow use of `assert`, and `print`. +"tests/wtf/test_http.py" = ["E402"] "examples/*" = ["T201"] # Allow `print` "cratedb_toolkit/retention/cli.py" = ["T201"] # Allow `print` "cratedb_toolkit/sqlalchemy/__init__.py" = ["F401"] # Allow `module´ imported but unused diff --git a/tests/conftest.py b/tests/conftest.py index 1b498e58..a1d05df8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,13 @@ TESTDRIVE_DATA_SCHEMA = "testdrive-data" TESTDRIVE_EXT_SCHEMA = "testdrive-ext" RESET_TABLES = [ + # FIXME: Let all subsystems use configured schema instead of hard-coded ones. + '"doc"."clusterinfo"', + '"doc"."jobinfo"', + '"ext"."clusterinfo"', + '"ext"."jobinfo"', + '"stats"."statement_log"', + '"stats"."last_execution"', f'"{TESTDRIVE_EXT_SCHEMA}"."retention_policy"', f'"{TESTDRIVE_DATA_SCHEMA}"."raw_metrics"', f'"{TESTDRIVE_DATA_SCHEMA}"."sensor_readings"', diff --git a/tests/sqlalchemy/test_patch.py b/tests/sqlalchemy/test_patch.py index 07d85d1b..44ca4cf2 100644 --- a/tests/sqlalchemy/test_patch.py +++ b/tests/sqlalchemy/test_patch.py @@ -1,6 +1,11 @@ +import datetime +import json + +import pytest import sqlalchemy as sa from cratedb_toolkit.sqlalchemy import patch_inspector +from cratedb_toolkit.sqlalchemy.patch import CrateJsonEncoderWithNumPy from tests.conftest import TESTDRIVE_DATA_SCHEMA @@ -40,3 +45,23 @@ def test_inspector_patched(database): table_names = inspector.get_table_names() assert "foobar" in table_names + + +def test_json_encoder_date(): + """ + Verify the extended JSON encoder also accepts Python's `date` types. + """ + data = {"date": datetime.date(2024, 6, 4)} + encoded = json.dumps(data, cls=CrateJsonEncoderWithNumPy) + assert encoded == '{"date": 1717459200000}' + + +def test_json_encoder_numpy(): + """ + Verify the extended JSON encoder also accepts NumPy types. + """ + np = pytest.importorskip("numpy") + + data = {"scalar-int": np.float32(42.42).astype(int), "scalar-float": np.float32(42.42), "ndarray": np.ndarray([1])} + encoded = json.dumps(data, cls=CrateJsonEncoderWithNumPy) + assert encoded == """{"scalar-int": 42, "scalar-float": 42.41999816894531, "ndarray": [2.08e-322]}""" diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/util/test_platform.py b/tests/util/test_platform.py new file mode 100644 index 00000000..935e4daf --- /dev/null +++ b/tests/util/test_platform.py @@ -0,0 +1,15 @@ +from cratedb_toolkit.util.platform import PlatformInfo + + +def test_platforminfo_application(): + pi = PlatformInfo() + outcome = pi.application() + assert "name" in outcome + assert "version" in outcome + assert "platform" in outcome + + +def test_platforminfo_libraries(): + pi = PlatformInfo() + outcome = pi.libraries() + assert isinstance(outcome, dict) diff --git a/tests/wtf/test_cli.py b/tests/wtf/test_cli.py index b9620cd2..9988f016 100644 --- a/tests/wtf/test_cli.py +++ b/tests/wtf/test_cli.py @@ -2,6 +2,7 @@ from boltons.iterutils import get_path from click.testing import CliRunner +from yarl import URL from cratedb_toolkit.wtf.cli import cli @@ -88,6 +89,40 @@ def test_wtf_cli_job_info(cratedb): assert "performance15min" in data_keys +def test_wtf_cli_statistics_collect(cratedb, caplog): + """ + Verify `cratedb-wtf job-statistics collect`. + """ + + uri = URL(cratedb.database.dburi) + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="job-statistics collect --once", + env={"HOSTNAME": f"{uri.host}:{uri.port}"}, + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome: Log output. + assert "Recording information snapshot" in caplog.messages + + # Verify outcome: Database content. + # stats.statement_log, stats.last_execution + results = cratedb.database.run_sql("SHOW TABLES", records=True) + assert {"table_name": "last_execution"} in results + assert {"table_name": "statement_log"} in results + + # FIXME: Table is empty. Why? + cratedb.database.run_sql('REFRESH TABLE "stats"."statement_log"') + assert cratedb.database.count_records("stats.statement_log") == 0 + + cratedb.database.run_sql('REFRESH TABLE "stats"."last_execution"') + assert cratedb.database.count_records("stats.last_execution") == 1 + + def test_wtf_cli_statistics_view(cratedb): """ Verify `cratedb-wtf job-statistics view`. @@ -109,3 +144,32 @@ def test_wtf_cli_statistics_view(cratedb): data_keys = list(info["data"].keys()) assert "stats" in data_keys + + +def test_wtf_cli_record(cratedb, caplog): + """ + Verify `cratedb-wtf record`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="--debug record --once", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome: Log output. + assert "Recording information snapshot" in caplog.messages + + # Verify outcome: Database content. + results = cratedb.database.run_sql("SHOW TABLES", records=True) + assert {"table_name": "clusterinfo"} in results + assert {"table_name": "jobinfo"} in results + + cratedb.database.run_sql('REFRESH TABLE "ext"."clusterinfo"') + assert cratedb.database.count_records("ext.clusterinfo") == 1 + + cratedb.database.run_sql('REFRESH TABLE "ext"."jobinfo"') + assert cratedb.database.count_records("ext.jobinfo") == 1 diff --git a/tests/wtf/test_http.py b/tests/wtf/test_http.py new file mode 100644 index 00000000..a670e769 --- /dev/null +++ b/tests/wtf/test_http.py @@ -0,0 +1,43 @@ +import datetime as dt +import os + +import pytest + +pytest.importorskip("fastapi") + + +from fastapi.testclient import TestClient + +from cratedb_toolkit import __appname__, __version__ +from cratedb_toolkit.wtf.http import app + +client = TestClient(app) + + +def test_http_root(): + response = client.get("/") + data = response.json() + assert response.status_code == 200 + assert data["application_name"] == __appname__ + assert data["application_version"].startswith(__version__) + assert dt.datetime.fromisoformat(data["system_time"]).year == dt.datetime.now().year + + +def test_http_info(cratedb, mocker): + mocker.patch.dict(os.environ, {"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + + response = client.get("/info/all") + info = response.json() + assert response.status_code == 200 + + assert info["meta"]["application_name"] == __appname__ + assert info["meta"]["application_version"].startswith(__version__) + assert dt.datetime.fromisoformat(info["meta"]["system_time"]).year == dt.datetime.now().year + assert "elements" in info["meta"] + assert len(info["meta"]["elements"]) > 15 + + assert "data" in info + assert info["data"]["database"]["cluster_nodes_count"] == 1 + assert info["data"]["system"]["application"]["name"] == __appname__ + + assert "eco" in info["data"]["system"] From 631c192869883d484d75546815c38f1990f9ae34 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 5 Jun 2024 01:06:37 +0200 Subject: [PATCH 13/15] cfr: Improve test cases --- tests/cfr/test_cli.py | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py index 4a6d95a8..2ef639e9 100644 --- a/tests/cfr/test_cli.py +++ b/tests/cfr/test_cli.py @@ -20,7 +20,7 @@ def filenames(path: Path): return sorted([item.name for item in path.iterdir()]) -def test_cfr_cli_export(cratedb, tmp_path, caplog): +def test_cfr_cli_export_success(cratedb, tmp_path, caplog): """ Verify `ctk cfr sys-export` works. """ @@ -49,7 +49,26 @@ def test_cfr_cli_export(cratedb, tmp_path, caplog): assert len(data_files) >= 10 -def test_cfr_cli_import(cratedb, tmp_path, caplog): +def test_cfr_cli_export_failure(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-export` failure. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": "crate://foo.bar/", "CFR_TARGET": str(tmp_path)}) + result = runner.invoke( + cli, + args="--debug sys-export", + catch_exceptions=False, + ) + assert result.exit_code == 1 + + # Verify log output. + assert "Failed to establish a new connection" in caplog.text or "Failed to resolve" in caplog.text + assert result.output == "" + + +def test_cfr_cli_import_success(cratedb, tmp_path, caplog): """ Verify `ctk cfr sys-import` works. """ @@ -108,3 +127,22 @@ def test_cfr_cli_import(cratedb, tmp_path, caplog): cratedb.database.run_sql('REFRESH TABLE "sys-operations"') assert cratedb.database.count_records("sys-operations") == 1 + + +def test_cfr_cli_import_failure(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-import` failure. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": "crate://foo.bar/", "CFR_SOURCE": str(tmp_path)}) + result = runner.invoke( + cli, + args="--debug sys-import", + catch_exceptions=False, + ) + assert result.exit_code == 1 + + # Verify log output. + assert "Failed to establish a new connection" in caplog.text or "Failed to resolve" in caplog.text + assert result.output == "" From f4821864db24ec80e3a37f07c7af3efd6a361c21 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 13 Jun 2024 12:52:14 +0200 Subject: [PATCH 14/15] wtf: Improve `wtf serve` - Forward CLI options `--listen` and `--reload` to implementation - Improve documentation about the `--listen` and `--reload` options - Assign random port when `--listen` is supplied without port number --- cratedb_toolkit/util/service.py | 3 ++- cratedb_toolkit/wtf/http.py | 2 +- doc/wtf/index.md | 13 ++++++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/cratedb_toolkit/util/service.py b/cratedb_toolkit/util/service.py index a26c70b4..d2dc4440 100644 --- a/cratedb_toolkit/util/service.py +++ b/cratedb_toolkit/util/service.py @@ -15,7 +15,8 @@ def start_service(app: str, listen_address: t.Union[str, None] = None, reload: b if listen_address is None: listen_address = "127.0.0.1:4242" - host, port = listen_address.split(":") + host, _, port = listen_address.partition(":") + port = port or "0" port_int = int(port) logger.info(f"Starting HTTP web service on http://{listen_address}") diff --git a/cratedb_toolkit/wtf/http.py b/cratedb_toolkit/wtf/http.py index f2a73011..57d7a15a 100644 --- a/cratedb_toolkit/wtf/http.py +++ b/cratedb_toolkit/wtf/http.py @@ -38,4 +38,4 @@ def info(category: str, adapter: t.Annotated[DatabaseAdapter, Depends(database_a def start(listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover - start_service(app="cratedb_toolkit.wtf.http:app") + start_service(app="cratedb_toolkit.wtf.http:app", listen_address=listen_address, reload=reload) diff --git a/doc/wtf/index.md b/doc/wtf/index.md index 6113ac3a..2ef02429 100644 --- a/doc/wtf/index.md +++ b/doc/wtf/index.md @@ -51,7 +51,7 @@ Install. pip install --upgrade 'cratedb-toolkit[service]' ``` -Expose collected status information. +Expose collected status information. ```shell cratedb-wtf --debug serve --reload ``` @@ -60,6 +60,17 @@ Consume collected status information via HTTP. http http://127.0.0.1:4242/info/all ``` +Make the service listen on a specific address. +```shell +ctk wtf serve --listen 0.0.0.0:8042 +``` + +:::{note} +The `--reload` option is suitable for development scenarios where you intend +to have the changes to the code become available while editing, in near +real-time. +::: + ```{toctree} :maxdepth: 1 From 5a9ef0ee65e407342d3982c4dd1bcfebb2ce4f8e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 13 Jun 2024 14:19:00 +0200 Subject: [PATCH 15/15] wtf: Simplify SQL statement `shard_allocation` > This looks unreasonable complicated to just translate the primary boolean > into a string. Co-authored-by: Sebastian Utz --- cratedb_toolkit/wtf/library.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/cratedb_toolkit/wtf/library.py b/cratedb_toolkit/wtf/library.py index b2be3697..387a5b82 100644 --- a/cratedb_toolkit/wtf/library.py +++ b/cratedb_toolkit/wtf/library.py @@ -423,16 +423,11 @@ class Shards: name="shard_allocation", sql=""" SELECT - IF(s.primary = TRUE, 'primary', 'replica') AS shard_type, - COALESCE(shards, 0) AS shards - FROM - UNNEST([true, false]) s(primary) - LEFT JOIN ( - SELECT primary, COUNT(*) AS shards + IF(primary = TRUE, 'primary', 'replica') AS shard_type, + COUNT(*) AS shards FROM sys.allocations WHERE current_state != 'STARTED' GROUP BY 1 - ) a ON s.primary = a.primary; """, label="Shard Allocation", description="Support identifying issues with shard allocation.",