diff --git a/.gitignore b/.gitignore index efc0473d..c228796a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ __pycache__ dist .coverage* coverage.xml +/foo +/tmp diff --git a/CHANGES.md b/CHANGES.md index 1a80dc53..16b6d75e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - MongoDB: Improve UX by using `ctk load table mongodb://...` - load table: Refactor to use more OO - Add `examples/cloud_import.py` +- Add `cratedb-wtf` diagnostics program ## 2023/11/06 v0.0.2 diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 26c0ef65..9315a701 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -7,6 +7,7 @@ from .io.cli import cli as io_cli from .job.cli import cli_list_jobs from .shell.cli import cli as shell_cli +from .wtf.cli import cli as wtf_cli @click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] @@ -21,4 +22,5 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): cli.add_command(cloud_cli, name="cluster") cli.add_command(io_cli, name="load") cli.add_command(shell_cli, name="shell") +cli.add_command(wtf_cli, name="wtf") cli.add_command(cli_list_jobs) diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index 9ebcb3ab..f5956dea 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -72,13 +72,15 @@ def boot_with_dburi(): return dburi -def make_command(cli, name, helpfun=None, aliases=None): +def make_command(cli, name, help=None, aliases=None): # noqa: A002 """ Convenience shortcut for creating a subcommand. """ kwargs = {} - if helpfun: - kwargs["help"] = docstring_format_verbatim(helpfun.__doc__) + if isinstance(help, str): + kwargs["help"] = help + elif callable(help): + kwargs["help"] = docstring_format_verbatim(help.__doc__) return cli.command( name, context_settings={"max_content_width": 120}, diff --git a/cratedb_toolkit/util/data.py b/cratedb_toolkit/util/data.py index 3a4d67d1..62cdbb33 100644 --- a/cratedb_toolkit/util/data.py +++ b/cratedb_toolkit/util/data.py @@ -1,3 +1,4 @@ +import datetime as dt import json import sys import typing as t @@ -7,7 +8,7 @@ def jd(data: t.Any): """ Pretty-print JSON with indentation. """ - print(json.dumps(data, indent=2), file=sys.stdout) # noqa: T201 + print(json.dumps(data, indent=2, cls=JSONEncoderPlus), file=sys.stdout) # noqa: T201 def str_contains(haystack, *needles): @@ -16,3 +17,15 @@ def str_contains(haystack, *needles): """ haystack = str(haystack) return any(needle in haystack for needle in needles) + + +class JSONEncoderPlus(json.JSONEncoder): + """ + https://stackoverflow.com/a/27058505 + """ + + def default(self, o): + if isinstance(o, dt.datetime): + return o.isoformat() + + return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/util/platform.py b/cratedb_toolkit/util/platform.py new file mode 100644 index 00000000..0c588dfd --- /dev/null +++ b/cratedb_toolkit/util/platform.py @@ -0,0 +1,56 @@ +import io +import json +from contextlib import redirect_stdout + + +class PlatformInfo: + @staticmethod + def application(): + import platform + + from cratedb_toolkit import __appname__, __version__ + + data = {} + + data["platform"] = platform.platform() + data["version"] = __version__ + data["name"] = __appname__ + return data + + @staticmethod + def libraries(): + data = {} + + # SQLAlchemy + from importlib.metadata import entry_points + + try: + import sqlalchemy.dialects.plugins + import sqlalchemy.dialects.registry + + data["sqlalchemy"] = { + "dialects_builtin": list(sqlalchemy.dialects.registry.impls.keys()), + "dialects_3rdparty": [dialect.name for dialect in entry_points(group="sqlalchemy.dialects")], # type: ignore[attr-defined,call-arg] + "plugins": list(sqlalchemy.dialects.plugins.impls.keys()), + } + except Exception: # noqa: S110 + pass + + # pandas + try: + import pandas + + buffer = io.StringIO() + with redirect_stdout(buffer): + pandas.show_versions(as_json=True) + buffer.seek(0) + data["pandas"] = json.load(buffer) + except Exception: # noqa: S110 + pass + + # fsspec + import fsspec + + data["fsspec"] = {"protocols": fsspec.available_protocols(), "compressions": fsspec.available_compressions()} + + return data diff --git a/cratedb_toolkit/util/service.py b/cratedb_toolkit/util/service.py new file mode 100644 index 00000000..a26c70b4 --- /dev/null +++ b/cratedb_toolkit/util/service.py @@ -0,0 +1,23 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import typing as t + +from cratedb_toolkit.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +def start_service(app: str, listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + setup_logging() + from uvicorn import run + + if listen_address is None: + listen_address = "127.0.0.1:4242" + + host, port = listen_address.split(":") + port_int = int(port) + + logger.info(f"Starting HTTP web service on http://{listen_address}") + + run(app=app, host=host, port=port_int, reload=reload) diff --git a/cratedb_toolkit/wtf/README.md b/cratedb_toolkit/wtf/README.md new file mode 100644 index 00000000..f06a4774 --- /dev/null +++ b/cratedb_toolkit/wtf/README.md @@ -0,0 +1,47 @@ +# cratedb-wtf + +A diagnostics utility in the spirit of [git-wtf], [grafana-wtf], and [pip.wtf]. +It is still a work-in-progress, but it is usable already. + + +## Synopsis + +Define CrateDB database cluster address. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +``` + +Display system and database cluster information. +```shell +cratedb-wtf info +``` + +Display database cluster log messages. +```shell +cratedb-wtf logs +``` + +Statistics. +```shell +cratedb-wtf job-statistics quick +cratedb-wtf job-statistics collect +cratedb-wtf job-statistics view +``` + + +## HTTP API + +Expose collected status information. +```shell +cratedb-wtf --debug serve --reload +``` +Consume collected status information via HTTP. +```shell +http http://127.0.0.1:4242/info/all +``` + + + +[git-wtf]: http://thrawn01.org/posts/2014/03/03/git-wtf/ +[grafana-wtf]: https://github.com/panodata/grafana-wtf +[pip.wtf]: https://github.com/sabslikesobs/pip.wtf diff --git a/cratedb_toolkit/wtf/__init__.py b/cratedb_toolkit/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/wtf/backlog.md b/cratedb_toolkit/wtf/backlog.md new file mode 100644 index 00000000..9c616cfa --- /dev/null +++ b/cratedb_toolkit/wtf/backlog.md @@ -0,0 +1,34 @@ +# cratedb-wtf backlog + +## Iteration +1 +- Expose collected data via Glances-like UI +- Experimental UI using Grafana Scenes + +## Iteration +2 +- Make `cratedb-wtf logs` also optionally consider `sys.` tables. +- cratedb-wtf explore table|shard|partition|node +- High-level analysis, evaluating a set of threshold rules +- Network diagnostics? + +## Iteration +3 +- Make it work with CrateDB Cloud. + ``` + ctk cluster info + ctk cluster health + ctk cluster logs --slow-queries + ``` + +## Done +- Make it work +- Proper marshalling of timestamp values (ISO 8601) +- Expose collected data via HTTP API + ``` + cratedb-wtf serve + ``` +- Provide `scrub` option also via HTTP +- Complete collected queries and code snippets +- Harvest queries from Admin UI, crash, crate-admin-alt +- Harvest queries from experts + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production&viewPanel=44 +- Add `description` and `unit` fields to each `InfoElement` definition diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py new file mode 100644 index 00000000..5b2761db --- /dev/null +++ b/cratedb_toolkit/wtf/cli.py @@ -0,0 +1,188 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import sys +import typing as t +import urllib.parse + +import click +from click_aliases import ClickAliasedGroup + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import ( + boot_click, + make_command, +) +from cratedb_toolkit.util.data import jd +from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer, LogContainer + +logger = logging.getLogger(__name__) + + +def help_info(): + """ + Database cluster and system information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf info + + """ # noqa: E501 + + +def help_logs(): + """ + Database cluster logs. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf logs + + """ # noqa: E501 + + +def help_statistics(): + """ + Database cluster job / query statistics. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf job-statistics quick + cratedb-wtf job-statistics collect + cratedb-wtf job-statistics view + + """ # noqa: E501 + + +def help_serve(): + """ + Start HTTP service to expose collected information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf serve + + """ # noqa: E501 + + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@cratedb_sqlalchemy_option +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") +@click.version_option() +@click.pass_context +def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): + """ + Diagnostics and informational utilities. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + return boot_click(ctx, verbose, debug) + + +@make_command(cli, "info", help_info) +@click.pass_context +def info(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = InfoContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(cli, "logs", help_logs) +@click.pass_context +def logs(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = LogContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(cli, "job-info", "Display information about jobs / queries.") +@click.pass_context +def job_information(ctx: click.Context): + """ + Display ad hoc job information. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = JobInfoContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@click.pass_context +def job_statistics(ctx: click.Context): + """ + Collect and display statistics about jobs / queries. + """ + pass + + +cli.add_command(job_statistics, name="job-statistics", aliases=["jobstats"]) + + +@make_command(job_statistics, "collect", "Collect queries from sys.jobs_log.") +@click.pass_context +def job_statistics_collect(ctx: click.Context): + """ + Run jobs_log collector. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.main() + + +@make_command(job_statistics, "view", "View job statistics about collected queries.") +@click.pass_context +def job_statistics_view(ctx: click.Context): + """ + View job statistics about collected queries. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + url = urllib.parse.urlparse(cratedb_sqlalchemy_url) + hostname = f"{url.hostname}:{url.port or 4200}" + os.environ["HOSTNAME"] = hostname + + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.init() + + response: t.Dict = {"meta": {}, "data": {}} + response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change." + response["data"]["stats"] = cratedb_toolkit.wtf.query_collector.read_stats() + jd(response) + + +@make_command(cli, "serve", help_serve) +@click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address") +@click.option("--reload", is_flag=True, help="Dynamically reload changed files") +@click.pass_context +def serve(ctx: click.Context, listen: str, reload: bool): + from cratedb_toolkit.wtf.http import start + + start(listen, reload=reload) diff --git a/cratedb_toolkit/wtf/core.py b/cratedb_toolkit/wtf/core.py new file mode 100644 index 00000000..a69fcb0b --- /dev/null +++ b/cratedb_toolkit/wtf/core.py @@ -0,0 +1,81 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import boltons.ecoutils + +from cratedb_toolkit.util.platform import PlatformInfo +from cratedb_toolkit.wtf.library import Library +from cratedb_toolkit.wtf.model import InfoContainerBase + + +class InfoContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + # General cluster health information. + Library.Health.cluster_name, + Library.Health.nodes_count, + Library.Health.nodes_list, + Library.Health.table_health, + Library.Health.backups_recent, + # Shard / node / partition allocation and rebalancing information. + Library.Shards.allocation, + Library.Shards.table_allocation, + Library.Shards.node_shard_distribution, + Library.Shards.table_shard_count, + Library.Shards.rebalancing_progress, + Library.Shards.rebalancing_status, + Library.Shards.not_started, + Library.Shards.not_started_count, + Library.Shards.max_checkpoint_delta, + Library.Shards.total_count, + Library.Shards.translog_uncommitted, + Library.Shards.translog_uncommitted_size, + ) + + def to_dict(self, data=None): + return super().to_dict(data={"system": self.system(), "database": self.database()}) + + def system(self): + data = {} + data["remark"] = ( + "This section includes system information about the machine running CrateDB " + 'Toolkit, effectively about the "compute" domain.' + ) + data["application"] = PlatformInfo.application() + data["eco"] = boltons.ecoutils.get_profile(scrub=self.scrub) + # data["libraries"] = PlatformInfo.libraries() # noqa: ERA001 + return data + + def database(self): + data = {} + data["remark"] = ( + "This section includes system and other diagnostics information about the CrateDB " + 'database cluster, effectively about the "storage" domain.' + ) + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + +class LogContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + Library.Logs.user_queries_latest, + ) + + +class JobInfoContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + Library.JobInfo.age_range, + Library.JobInfo.by_user, + Library.JobInfo.duration_buckets, + Library.JobInfo.duration_percentiles, + Library.JobInfo.history100, + Library.JobInfo.history_count, + Library.JobInfo.performance15min, + Library.JobInfo.running, + Library.JobInfo.running_count, + Library.JobInfo.top100_count, + Library.JobInfo.top100_duration_individual, + Library.JobInfo.top100_duration_total, + ) diff --git a/cratedb_toolkit/wtf/http.py b/cratedb_toolkit/wtf/http.py new file mode 100644 index 00000000..1d70a4a7 --- /dev/null +++ b/cratedb_toolkit/wtf/http.py @@ -0,0 +1,41 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import typing as t +from functools import lru_cache + +from fastapi import Depends, FastAPI, HTTPException + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.service import start_service +from cratedb_toolkit.wtf.core import InfoContainer +from cratedb_toolkit.wtf.util import get_baseinfo + +logger = logging.getLogger(__name__) + +app = FastAPI() + + +@lru_cache +def database_adapter() -> DatabaseAdapter: + # TODO: return config.Settings() + cratedb_sqlalchemy_url = os.environ["CRATEDB_SQLALCHEMY_URL"] + return DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + + +@app.get("/") +def read_root(): + return get_baseinfo() + + +@app.get("/info/{category}") +def info(category: str, adapter: t.Annotated[DatabaseAdapter, Depends(database_adapter)], scrub: bool = False): # type: ignore[name-defined] + if category != "all": + raise HTTPException(status_code=404, detail="Info category not found") + sample = InfoContainer(adapter=adapter, scrub=scrub) + return sample.to_dict() + + +def start(listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + start_service(app="cratedb_toolkit.wtf.http:app") diff --git a/cratedb_toolkit/wtf/library.py b/cratedb_toolkit/wtf/library.py new file mode 100644 index 00000000..a2167bbc --- /dev/null +++ b/cratedb_toolkit/wtf/library.py @@ -0,0 +1,587 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +from cratedb_toolkit.wtf.model import InfoElement, LogElement +from cratedb_toolkit.wtf.util import get_single_value + + +class Library: + """ + A collection of SQL queries and utilities suitable for diagnostics on CrateDB. + + Credits to the many authors and contributors of CrateDB diagnostics utilities, + dashboards, and cheat sheets. + + Acknowledgements: Baurzhan Sakhariev, Eduardo Legatti, Georg Traar, Hernan + Cianfagna, Ivan Sanchez Valencia, Karyn Silva de Azevedo, Niklas Schmidtmer, + Walter Behmann. + + References: + - https://community.cratedb.com/t/similar-elasticsearch-commands/1455/4 + - CrateDB Admin UI. + - CrateDB Grafana General Diagnostics Dashboard. + - Debugging CrateDB - Queries Cheat Sheet. + """ + + class Health: + """ + CrateDB health check queries. + """ + + backups_recent = InfoElement( + name="backups_recent", + label="Recent Backups", + sql=""" + SELECT repository, name, finished, state + FROM sys.snapshots + ORDER BY finished DESC + LIMIT 10; + """, + description="Most recent 10 backups", + ) + + cluster_name = InfoElement( + name="cluster_name", + label="Cluster name", + sql=r"SELECT name FROM sys.cluster;", + transform=get_single_value("name"), + ) + + nodes_count = InfoElement( + name="cluster_nodes_count", + label="Total number of cluster nodes", + sql=r"SELECT COUNT(id) AS count FROM sys.nodes;", + transform=get_single_value("count"), + ) + nodes_list = InfoElement( + name="cluster_nodes_list", + label="Cluster Nodes", + sql="SELECT * FROM sys.nodes ORDER BY hostname;", + description="Telemetry information for all cluster nodes.", + ) + table_health = InfoElement( + name="table_health", + label="Table Health", + sql="SELECT health, COUNT(*) AS table_count FROM sys.health GROUP BY health;", + description="Table health short summary", + ) + + class JobInfo: + """ + Information distilled from `sys.jobs_log` and `sys.jobs`. + """ + + age_range = InfoElement( + name="age_range", + label="Query age range", + description="Timestamps of first and last job", + sql=""" + SELECT + MIN(started) AS "first_job", + MAX(started) AS "last_job" + FROM sys.jobs_log; + """, + ) + by_user = InfoElement( + name="by_user", + label="Queries by user", + sql=r""" + SELECT + username, + COUNT(username) AS count + FROM sys.jobs_log + GROUP BY username + ORDER BY count DESC; + """, + description="Total number of queries per user.", + ) + + duration_buckets = InfoElement( + name="duration_buckets", + label="Query Duration Distribution (Buckets)", + sql=""" + WITH dur AS ( + SELECT + ended-started::LONG AS duration + FROM sys.jobs_log + ), + pct AS ( + SELECT + [0.25,0.5,0.75,0.99,0.999,1] pct_in, + percentile(duration,[0.25,0.5,0.75,0.99,0.999,1]) as pct, + count(*) cnt + FROM dur + ) + SELECT + UNNEST(pct_in) * 100 AS bucket, + cnt - CEIL(UNNEST(pct_in) * cnt) AS count, + CEIL(UNNEST(pct)) duration + ---cnt + FROM pct; + """, + description="Distribution of query durations, bucketed.", + ) + duration_percentiles = InfoElement( + name="duration_percentiles", + label="Query Duration Distribution (Percentiles)", + sql=""" + SELECT + min(ended-started::LONG) AS min, + percentile(ended-started::LONG, 0.50) AS p50, + percentile(ended-started::LONG, 0.90) AS p90, + percentile(ended-started::LONG, 0.99) AS p99, + MAX(ended-started::LONG) AS max + FROM + sys.jobs_log + LIMIT 50; + """, + description="Distribution of query durations, percentiles.", + ) + history100 = InfoElement( + name="history", + label="Query History", + sql=""" + SELECT + started AS "time", + stmt, + (ended::LONG - started::LONG) AS duration, + username + FROM sys.jobs_log + WHERE stmt NOT ILIKE '%snapshot%' + ORDER BY time DESC + LIMIT 100; + """, + transform=lambda x: list(reversed(x)), + description="Statements and durations of the 100 recent queries / jobs.", + ) + history_count = InfoElement( + name="history_count", + label="Query History Count", + sql=""" + SELECT + COUNT(*) AS job_count + FROM + sys.jobs_log; + """, + transform=get_single_value("job_count"), + description="Total number of queries on this node.", + ) + performance15min = InfoElement( + name="performance15min", + label="Query performance 15min", + sql=r""" + SELECT + CURRENT_TIMESTAMP AS last_timestamp, + (ended / 10000) * 10000 + 5000 AS ended_time, + COUNT(*) / 10.0 AS qps, + TRUNC(AVG(ended::BIGINT - started::BIGINT), 2) AS duration, + UPPER(regexp_matches(stmt,'^\s*(\w+).*')[1]) AS query_type + FROM + sys.jobs_log + WHERE + ended > now() - ('15 minutes')::INTERVAL + GROUP BY 1, 2, 5 + ORDER BY ended_time ASC; + """, + description="The query performance within the last 15 minutes, including two metrics: " + "queries per second, and query speed (ms).", + ) + running = InfoElement( + name="running", + label="Currently Running Queries", + sql=""" + SELECT + started AS "time", + stmt, + (CURRENT_TIMESTAMP::LONG - started::LONG) AS duration, + username + FROM sys.jobs + WHERE stmt NOT ILIKE '%snapshot%' + ORDER BY time; + """, + description="Statements and durations of currently running queries / jobs.", + ) + running_count = InfoElement( + name="running_count", + label="Number of running queries", + sql=""" + SELECT + COUNT(*) AS job_count + FROM + sys.jobs; + """, + transform=get_single_value("job_count"), + description="Total number of currently running queries.", + ) + top100_count = InfoElement( + name="top100_count", + label="Query frequency", + description="The 100 most frequent queries.", + sql=""" + SELECT + stmt, + COUNT(stmt) AS stmt_count, + MAX((ended::LONG - started::LONG) ) AS max_duration, + MIN((ended::LONG - started::LONG) ) AS min_duration, + AVG((ended::LONG - started::LONG) ) AS avg_duration, + PERCENTILE((ended::LONG - started::LONG), 0.99) AS p90 + FROM sys.jobs_log + GROUP BY stmt + ORDER BY stmt_count DESC + LIMIT 100; + """, + ) + top100_duration_individual = InfoElement( + name="top100_duration_individual", + label="Individual Query Duration", + description="The 100 queries by individual duration.", + sql=""" + SELECT + (ended::LONG - started::LONG) AS duration, + stmt + FROM sys.jobs_log + ORDER BY duration DESC + LIMIT 100; + """, + unit="ms", + ) + top100_duration_total = InfoElement( + name="top100_duration_total", + label="Total Query Duration", + description="The 100 queries by total duration.", + sql=""" + SELECT + SUM(ended::LONG - started::LONG) AS total_duration, + stmt, + COUNT(stmt) AS stmt_count + FROM sys.jobs_log + GROUP BY stmt + ORDER BY total_duration DESC + LIMIT 100; + """, + unit="ms", + ) + + class Logs: + """ + Access `sys.jobs_log` for logging purposes. + """ + + # TODO: Implement `tail` in one way or another. -- https://stackoverflow.com/q/4714975 + # SELECT * FROM sys.jobs_log OFFSET -10; + # SELECT * FROM sys.jobs_log OFFSET (SELECT count(*) FROM sys.jobs_log)-10; + # https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#to-char-expression-format-string + # https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#date-format-format-string-timezone-timestamp + user_queries_latest = LogElement( + name="user_queries_latest", + label="Latest User Queries", + sql=r""" + SELECT + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', started) AS started, + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', ended) AS ended, + classification, stmt, username, node + FROM + sys.jobs_log + WHERE + stmt NOT LIKE '%sys.%' AND + stmt NOT LIKE '%information_schema.%' + ORDER BY ended DESC + LIMIT {limit}; + """, + ) + + class Replication: + """ + Information about logical replication. + """ + + # https://github.com/crate/crate/blob/master/docs/admin/logical-replication.rst#monitoring + subscriptions = """ + SELECT s.subname, s.subpublications, sr.srrelid::text, sr.srsubstate, sr.srsubstate_reason + FROM pg_subscription s + JOIN pg_subscription_rel sr ON s.oid = sr.srsubid + ORDER BY s.subname; + """ + + class Resources: + """ + About system resources. + """ + + # TODO: Needs templating. + column_cardinality = """ + SELECT tablename, attname, n_distinct + FROM pg_stats + WHERE schemaname = '...' + AND tablename IN (...) + AND attname IN (...); + """ + + file_descriptors = """ + SELECT + name AS node_name, + process['open_file_descriptors'] AS "open_file_descriptors", + process['max_open_file_descriptors'] AS max_open_file_descriptors + FROM sys.nodes + ORDER BY node_name; + """ + + heap_usage = """ + SELECT + name AS node_name, + heap['used'] / heap['max']::DOUBLE AS heap_used + FROM sys.nodes + ORDER BY node_name; + """ + + tcp_connections = """ + SELECT + name AS node_name, + connections + FROM sys.nodes + ORDER BY node_name; + """ + + # TODO: Why "14"? Is it about only getting information about the `write` thread pool? + thread_pools = """ + SELECT + name AS node_name, + thread_pools[14]['queue'], + thread_pools[14]['active'], + thread_pools[14]['threads'] + FROM sys.nodes + ORDER BY node_name; + """ + + class Settings: + """ + Reflect cluster settings. + """ + + info = """ + SELECT + name, + master_node, + settings['cluster']['routing']['allocation']['cluster_concurrent_rebalance'] + AS cluster_concurrent_rebalance, + settings['indices']['recovery']['max_bytes_per_sec'] AS max_bytes_per_sec + FROM sys.cluster + LIMIT 1; + """ + + class Shards: + """ + Information about shard / node / table / partition allocation and rebalancing. + """ + + # https://cratedb.com/docs/crate/reference/en/latest/admin/system-information.html#example + # TODO: Needs templating. + for_table = """ + SELECT + schema_name, + table_name, + id, + partition_ident, + num_docs, + primary, + relocating_node, + routing_state, + state, + orphan_partition + FROM sys.shards + WHERE schema_name = '{schema_name}' AND table_name = '{table_name}'; + """ + + # Identify the location of the shards for each partition. + # TODO: Needs templating. + location_for_partition = """ + SELECT table_partitions.table_schema, + table_partitions.table_name, + table_partitions.values[{partition_column}]::TIMESTAMP, + shards.primary, + shards.node['name'] + FROM sys.shards + JOIN information_schema.table_partitions ON shards.partition_ident=table_partitions.partition_ident + WHERE table_partitions.table_name = {table_name} + ORDER BY 1,2,3,4,5; + """ + + allocation = InfoElement( + name="shard_allocation", + sql=""" + SELECT + IF(s.primary = TRUE, 'primary', 'replica') AS shard_type, + COALESCE(shards, 0) AS shards + FROM + UNNEST([true, false]) s(primary) + LEFT JOIN ( + SELECT primary, COUNT(*) AS shards + FROM sys.allocations + WHERE current_state != 'STARTED' + GROUP BY 1 + ) a ON s.primary = a.primary; + """, + label="Shard Allocation", + description="Support identifying issues with shard allocation.", + ) + + max_checkpoint_delta = InfoElement( + name="max_checkpoint_delta", + sql=""" + SELECT + COALESCE(MAX(seq_no_stats['local_checkpoint'] - seq_no_stats['global_checkpoint']), 0) + AS max_checkpoint_delta + FROM sys.shards; + """, + transform=get_single_value("max_checkpoint_delta"), + label="Delta between local and global checkpoint", + description="If the delta between the local and global checkpoint is significantly large, " + "shard replication might have stalled or slowed down.", + ) + + # data-hot-2 262 + # data-hot-1 146 + node_shard_distribution = InfoElement( + name="node_shard_distribution", + label="Shard Distribution", + sql=""" + SELECT + node['name'] AS node_name, + COUNT(*) AS num_shards + FROM sys.shards + WHERE primary = true + GROUP BY node_name; + """, + description="Shard distribution across nodes.", + ) + + not_started = InfoElement( + name="shard_not_started", + label="Shards not started", + sql=""" + SELECT * + FROM sys.allocations + WHERE current_state != 'STARTED'; + """, + description="Information about shards which have not been started.", + ) + not_started_count = InfoElement( + name="shard_not_started_count", + label="Number of shards not started", + description="Total number of shards which have not been started.", + sql=""" + SELECT COUNT(*) AS not_started_count + FROM sys.allocations + WHERE current_state != 'STARTED'; + """, + transform=get_single_value("not_started_count"), + ) + + rebalancing_progress = InfoElement( + name="shard_rebalancing_progress", + label="Shard Rebalancing Progress", + sql=""" + SELECT + table_name, + schema_name, + recovery['stage'] AS recovery_stage, + AVG(recovery['size']['percent']) AS progress, + COUNT(*) AS count + FROM + sys.shards + GROUP BY table_name, schema_name, recovery_stage; + """, + description="Information about rebalancing progress.", + ) + + rebalancing_status = InfoElement( + name="shard_rebalancing_status", + label="Shard Rebalancing Status", + sql=""" + SELECT node['name'], id, recovery['stage'], recovery['size']['percent'], routing_state, state + FROM sys.shards + WHERE routing_state IN ('INITIALIZING', 'RELOCATING') + ORDER BY id; + """, + description="Information about rebalancing activities.", + ) + + table_allocation = InfoElement( + name="table_allocation", + label="Table Allocations", + sql=""" + SELECT + table_schema, table_name, node_id, shard_id, partition_ident, current_state, decisions, explanation + FROM + sys.allocations; + """, + description="Table allocation across nodes, shards, and partitions.", + ) + + table_allocation_special = InfoElement( + name="table_allocation_special", + label="Table Allocations Special", + sql=""" + SELECT decisions[2]['node_name'] AS node_name, COUNT(*) AS table_count + FROM sys.allocations + GROUP BY decisions[2]['node_name']; + """, + description="Table allocation. Special.", + ) + + table_shard_count = InfoElement( + name="table_shard_count", + label="Table Shard Count", + sql=""" + SELECT + table_schema, + table_name, + SUM(number_of_shards) AS num_shards + FROM + information_schema.table_partitions + WHERE + closed = false + GROUP BY table_schema, table_name; + """, + description="Total number of shards per table.", + ) + + total_count = InfoElement( + name="shard_total_count", + label="Number of shards", + description="Total number of shards.", + sql=""" + SELECT COUNT(*) AS shard_count + FROM sys.shards + """, + transform=get_single_value("shard_count"), + ) + + # TODO: Are both `translog_uncommitted` items sensible? + translog_uncommitted = InfoElement( + name="translog_uncommitted", + label="Uncommitted Translog", + description="Check if translogs are committed properly by comparing the " + "`flush_threshold_size` with the `uncommitted_size` of a shard.", + sql=""" + SELECT + sh.table_name, + sh.partition_ident, + SUM(sh.translog_stats['uncommitted_size']) / POWER(1024, 3) as "translog_uncomitted_in_gib" + FROM information_schema.table_partitions tp + JOIN sys.shards sh USING (table_name, partition_ident) + WHERE sh.translog_stats['uncommitted_size'] > settings['translog']['flush_threshold_size'] + GROUP BY 1, 2 + ORDER BY 3 DESC; + """, + ) + translog_uncommitted_size = InfoElement( + name="translog_uncommitted_size", + label="Total uncommitted translog size", + description="A large number of uncommitted translog operations can indicate issues with shard replication.", + sql=""" + SELECT COALESCE(SUM(translog_stats['uncommitted_size']), 0) AS translog_uncommitted_size + FROM sys.shards; + """, + transform=get_single_value("translog_uncommitted_size"), + unit="bytes", + ) diff --git a/cratedb_toolkit/wtf/model.py b/cratedb_toolkit/wtf/model.py new file mode 100644 index 00000000..4f1515c0 --- /dev/null +++ b/cratedb_toolkit/wtf/model.py @@ -0,0 +1,86 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import dataclasses +import typing as t +from abc import abstractmethod + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.wtf.util import get_baseinfo + + +@dataclasses.dataclass +class InfoElement: + name: str + label: str + sql: str + description: t.Union[str, None] = None + transform: t.Union[t.Callable, None] = None + unit: t.Union[str, None] = None + + def to_dict(self): + data = dataclasses.asdict(self) + data["sql"] = data["sql"].strip() + data["transform"] = str(data["transform"]) + return data + + +@dataclasses.dataclass +class LogElement(InfoElement): + limit: int = 100 + + +@dataclasses.dataclass +class ElementStore: + items: t.List[InfoElement] = dataclasses.field(default_factory=list) + index: t.Dict[str, InfoElement] = dataclasses.field(default_factory=dict) + + def add(self, *elements: InfoElement): + for element in elements: + self.items.append(element) + if element.name in self.index: + raise KeyError(f"Duplicate key/label: {element.name}") + self.index[element.name] = element + + +class InfoContainerBase: + def __init__(self, adapter: DatabaseAdapter, scrub: bool = False): + self.adapter = adapter + self.scrub = scrub + self.elements = ElementStore() + self.register_builtins() + + @abstractmethod + def register_builtins(self): + raise NotImplementedError("Method needs to be implemented by child class") + + def metadata(self): + data = {} + data.update(get_baseinfo()) + data["elements"] = {} + for element in self.elements.items: + data["elements"][element.name] = element.to_dict() + return data + + def evaluate_element(self, element: InfoElement): + sql = element.sql + if isinstance(element, LogElement): + sql = sql.format(limit=element.limit) + results = self.adapter.run_sql(sql, records=True) + if element.transform is not None: + results = element.transform(results) + return results + + def to_dict(self, data=None): + if data is None: + data = self.render() + return {"meta": self.metadata(), "data": data} + + def render(self): + data = {} + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + # FIXME + def by_table(self, schema: str, table: str): + pass diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py new file mode 100644 index 00000000..47255192 --- /dev/null +++ b/cratedb_toolkit/wtf/query_collector.py @@ -0,0 +1,247 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. + +# ruff: noqa: S608 +import json +import logging +import os +import time +from uuid import uuid4 + +import urllib3 +from crate import client + +logger = logging.getLogger(__name__) + +host = os.getenv("HOSTNAME", "localhost:4200") +username = os.getenv("USERNAME", "crate") +password = os.getenv("PASSWORD", "") +interval = float(os.getenv("INTERVAL", 10)) +stmt_log_table = os.getenv("STMT_TABLE", "stats.statement_log") +last_exec_table = os.getenv("LAST_EXEC_TABLE", "stats.last_execution") +last_execution_ts = 0 +sys_jobs_log = {} +bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000] +bucket_dict = { + "10": 0, + "50": 0, + "100": 0, + "500": 0, + "1000": 0, + "2000": 0, + "5000": 0, + "10000": 0, + "15000": 0, + "20000": 0, + "INF": 0, +} + + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +conn = client.connect(host, username=username, password=password) +cursor = conn.cursor() +last_scrape = int(time.time() * 1000) - (interval * 60000) + +TRACING = False + + +def init(): + stmt = ( + f"CREATE TABLE IF NOT EXISTS {stmt_log_table} " + f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, " + f"username TEXT, query_type TEXT, avg_duration FLOAT, nodes ARRAY(TEXT))" + ) + cursor.execute(stmt) + stmt = f"SELECT id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used FROM {stmt_log_table}" + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + stmt = f"CREATE TABLE IF NOT EXISTS {last_exec_table} (last_execution TIMESTAMP)" + cursor.execute(stmt) + stmt = f"SELECT last_execution FROM {last_exec_table}" + cursor.execute(stmt) + init_last_execution(cursor.fetchall()) + + +def init_last_execution(last_execution): + global last_execution_ts + if len(last_execution) == 0: + last_execution_ts = 0 + stmt = f"INSERT INTO {last_exec_table} (last_execution) VALUES (?)" + cursor.execute(stmt, (0,)) + else: + last_execution_ts = last_execution[0][0] + + +def init_stmts(stmts): + for stmt in stmts: + stmt_id = stmt[0] + stmt_column = stmt[1] + calls = stmt[2] + bucket = stmt[3] + user = stmt[4] + stmt_type = stmt[5] + avg_duration = stmt[6] + nodes = stmt[7] + last_used = stmt[8] + + if stmt_column not in sys_jobs_log: + sys_jobs_log[stmt_column] = { + "id": stmt_id, + "size": 0, + "info": [], + "calls": calls, + "bucket": bucket, + "user": user, + "type": stmt_type, + "avg_duration": avg_duration, + "nodes": nodes, + "last_used": last_used, + "in_db": True, + "changed": False, + } + + +def write_stats_to_db(): + logger.info("Writing statistics to database") + write_query_stmt = ( + f"INSERT INTO {stmt_log_table} " + f"(id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used) " + f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + update_query_stmt = ( + f"UPDATE {stmt_log_table} " + f"SET calls = ?, avg_duration = ?, nodes = ?, bucket = ?, last_used = ? " + f"WHERE id = ?" + ) + write_params = [] + for key in sys_jobs_log.keys(): + if not sys_jobs_log[key]["in_db"]: + write_params.append( + [ + sys_jobs_log[key]["id"], + key, + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["user"], + sys_jobs_log[key]["type"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["last_used"], + ] + ) + sys_jobs_log[key]["in_db"] = True + sys_jobs_log[key]["changed"] = False + elif sys_jobs_log[key]["changed"]: + cursor.execute( + update_query_stmt, + ( + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["last_used"], + sys_jobs_log[key]["id"], + ), + ) + sys_jobs_log[key]["changed"] = False + if len(write_params) > 0: + cursor.executemany(write_query_stmt, write_params) + + stmt = f"UPDATE {last_exec_table} SET last_execution = ?" + cursor.execute(stmt, (last_scrape,)) + + +def read_stats(): + stmt = ( + f"SELECT id, stmt, calls, avg_duration, bucket, username, query_type, nodes, last_used " + f"FROM {stmt_log_table} ORDER BY calls DESC, avg_duration DESC;" + ) + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + return sys_jobs_log + + +def assign_to_bucket(bucket, duration): + found = False + for element in bucket_list: + if duration < element: + found = True + bucket[str(element)] += 1 + break + if not found: + bucket["INF"] += 1 + + return bucket + + +def update_statistics(query_results): + global sys_jobs_log + for result in query_results: + started = result[0] + ended = result[1] + classification = result[2] + stmt = result[3] + user = result[4] + node = json.dumps(result[5]) + + duration = ended - started + if stmt not in sys_jobs_log: + sys_jobs_log[stmt] = { + "id": str(uuid4()), + "calls": 0, + "bucket": dict(bucket_dict), + "user": user, + "type": classification["type"], + "avg_duration": duration, + "in_db": False, + "last_used": started, + "nodes": [], + "changed": True, + } + sys_jobs_log[stmt]["changed"] = True + sys_jobs_log[stmt]["avg_duration"] = (sys_jobs_log[stmt]["avg_duration"] + duration) / 2 + sys_jobs_log[stmt]["bucket"] = assign_to_bucket(sys_jobs_log[stmt]["bucket"], duration) + sys_jobs_log[stmt]["last_used"] = started + sys_jobs_log[stmt]["calls"] += 1 + sys_jobs_log[stmt]["nodes"].append(node) + sys_jobs_log[stmt]["nodes"] = list(set(sys_jobs_log[stmt]["nodes"])) # only save unique nodes + if TRACING: + logger.info(f"Updated statistics: {sys_jobs_log}") + + +def scrape_db(): + global last_scrape + logger.info("Reading sys.jobs_log") + next_scrape = int(time.time() * 1000) + stmt = ( + f"SELECT " + f"started, ended, classification, stmt, username, node " + f"FROM sys.jobs_log " + f"WHERE " + f"stmt NOT LIKE '%sys.%' AND " + f"stmt NOT LIKE '%information_schema.%' " + f"AND ended BETWEEN {last_scrape} AND {next_scrape} " + f"ORDER BY ended DESC" + ) + + cursor.execute(stmt) + result = cursor.fetchall() + update_statistics(result) + last_scrape = next_scrape + + +def run(): + scrape_db() + write_stats_to_db() + + +def main(): + init() + while True: + run() + logger.info(f"Sleeping for {interval} seconds") + time.sleep(interval) + + +if __name__ == "__main__": + main() diff --git a/cratedb_toolkit/wtf/util.py b/cratedb_toolkit/wtf/util.py new file mode 100644 index 00000000..983863f2 --- /dev/null +++ b/cratedb_toolkit/wtf/util.py @@ -0,0 +1,21 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import datetime as dt +import functools as ft +import typing as t + +from boltons.iterutils import get_path + +from cratedb_toolkit import __appname__, __version__ + + +def get_baseinfo(): + data: t.Dict[str, t.Union[str, dt.datetime]] = {} + data["system_time"] = dt.datetime.now() + data["application_name"] = __appname__ + data["application_version"] = __version__ + return data + + +def get_single_value(column_name: str): + return ft.partial(get_path, path=(0, column_name)) diff --git a/pyproject.toml b/pyproject.toml index 814bb780..66f71c82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,10 +92,12 @@ dependencies = [ "crash", "crate[sqlalchemy]>=0.34", "croud==1.8", + "fastapi<0.105", 'importlib-metadata; python_version <= "3.7"', "python-dotenv<2", "sqlalchemy", "sqlparse<0.5", + "uvicorn<0.25", ] [project.optional-dependencies] all = [ @@ -114,7 +116,7 @@ influxdb = [ ] io = [ "cr8", - "dask>=2020,<=2023.11.0", + "dask<=2023.11.0,>=2020", "pandas<3,>=1", ] mongodb = [ @@ -147,6 +149,7 @@ repository = "https://github.com/crate-workbench/cratedb-toolkit" [project.scripts] cratedb-retention = "cratedb_toolkit.retention.cli:cli" cratedb-toolkit = "cratedb_toolkit.cli:cli" +cratedb-wtf = "cratedb_toolkit.wtf.cli:cli" ctk = "cratedb_toolkit.cli:cli" migr8 = "cratedb_toolkit.io.mongodb.cli:main" diff --git a/tests/retention/test_cli.py b/tests/retention/test_cli.py index b100e50a..01b7487c 100644 --- a/tests/retention/test_cli.py +++ b/tests/retention/test_cli.py @@ -61,7 +61,7 @@ def test_setup_verbose(caplog, cratedb, settings): assert result.exit_code == 0 assert cratedb.database.table_exists(settings.policy_table.fullname) is True - assert 3 <= len(caplog.records) <= 10 + assert 3 <= len(caplog.records) <= 15 def test_setup_dryrun(caplog, cratedb, settings): diff --git a/tests/wtf/__init__.py b/tests/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/wtf/test_cli.py b/tests/wtf/test_cli.py new file mode 100644 index 00000000..b9620cd2 --- /dev/null +++ b/tests/wtf/test_cli.py @@ -0,0 +1,111 @@ +import json + +from boltons.iterutils import get_path +from click.testing import CliRunner + +from cratedb_toolkit.wtf.cli import cli + + +def test_wtf_cli_info(cratedb): + """ + Verify `cratedb-wtf info`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="info", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + system_keys = list(get_path(info, ("data", "system")).keys()) + database_keys = list(get_path(info, ("data", "database")).keys()) + assert system_keys == [ + "remark", + "application", + "eco", + # "libraries", + ] + assert "cluster_name" in database_keys + assert "cluster_nodes_count" in database_keys + + +def test_wtf_cli_logs(cratedb): + """ + Verify `cratedb-wtf logs`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="logs", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "user_queries_latest" in data_keys + assert len(info["data"]["user_queries_latest"]) > 3 + + +def test_wtf_cli_job_info(cratedb): + """ + Verify `cratedb-wtf job-info`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="job-info", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "by_user" in data_keys + assert "top100_count" in data_keys + assert "top100_duration_individual" in data_keys + assert "top100_duration_total" in data_keys + assert "performance15min" in data_keys + + +def test_wtf_cli_statistics_view(cratedb): + """ + Verify `cratedb-wtf job-statistics view`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="job-statistics view", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "stats" in data_keys