From 0e3ca31a0a23c271e1cbaaabf729b4e556ac1e0b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 1 Dec 2023 14:13:58 +0100 Subject: [PATCH] cratedb-wtf: Add `cratedb-wtf` diagnostics program --- CHANGES.md | 1 + cratedb_toolkit/cli.py | 2 + cratedb_toolkit/util/cli.py | 8 +- cratedb_toolkit/util/data.py | 15 +- cratedb_toolkit/util/platform.py | 56 ++++++ cratedb_toolkit/util/service.py | 24 +++ cratedb_toolkit/wtf/README.md | 47 +++++ cratedb_toolkit/wtf/__init__.py | 0 cratedb_toolkit/wtf/backlog.md | 35 ++++ cratedb_toolkit/wtf/cli.py | 188 +++++++++++++++++++ cratedb_toolkit/wtf/core.py | 69 +++++++ cratedb_toolkit/wtf/http.py | 41 ++++ cratedb_toolkit/wtf/library.py | 133 +++++++++++++ cratedb_toolkit/wtf/model.py | 84 +++++++++ cratedb_toolkit/wtf/query_collector.py | 247 +++++++++++++++++++++++++ cratedb_toolkit/wtf/util.py | 14 ++ pyproject.toml | 3 + tests/retention/test_cli.py | 2 +- tests/wtf/__init__.py | 0 tests/wtf/test_cli.py | 101 ++++++++++ 20 files changed, 1065 insertions(+), 5 deletions(-) create mode 100644 cratedb_toolkit/util/platform.py create mode 100644 cratedb_toolkit/util/service.py create mode 100644 cratedb_toolkit/wtf/README.md create mode 100644 cratedb_toolkit/wtf/__init__.py create mode 100644 cratedb_toolkit/wtf/backlog.md create mode 100644 cratedb_toolkit/wtf/cli.py create mode 100644 cratedb_toolkit/wtf/core.py create mode 100644 cratedb_toolkit/wtf/http.py create mode 100644 cratedb_toolkit/wtf/library.py create mode 100644 cratedb_toolkit/wtf/model.py create mode 100644 cratedb_toolkit/wtf/query_collector.py create mode 100644 cratedb_toolkit/wtf/util.py create mode 100644 tests/wtf/__init__.py create mode 100644 tests/wtf/test_cli.py diff --git a/CHANGES.md b/CHANGES.md index 1a80dc53..16b6d75e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - MongoDB: Improve UX by using `ctk load table mongodb://...` - load table: Refactor to use more OO - Add `examples/cloud_import.py` +- Add `cratedb-wtf` diagnostics program ## 2023/11/06 v0.0.2 diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 26c0ef65..9315a701 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -7,6 +7,7 @@ from .io.cli import cli as io_cli from .job.cli import cli_list_jobs from .shell.cli import cli as shell_cli +from .wtf.cli import cli as wtf_cli @click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] @@ -21,4 +22,5 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): cli.add_command(cloud_cli, name="cluster") cli.add_command(io_cli, name="load") cli.add_command(shell_cli, name="shell") +cli.add_command(wtf_cli, name="wtf") cli.add_command(cli_list_jobs) diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index 9ebcb3ab..f5956dea 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -72,13 +72,15 @@ def boot_with_dburi(): return dburi -def make_command(cli, name, helpfun=None, aliases=None): +def make_command(cli, name, help=None, aliases=None): # noqa: A002 """ Convenience shortcut for creating a subcommand. """ kwargs = {} - if helpfun: - kwargs["help"] = docstring_format_verbatim(helpfun.__doc__) + if isinstance(help, str): + kwargs["help"] = help + elif callable(help): + kwargs["help"] = docstring_format_verbatim(help.__doc__) return cli.command( name, context_settings={"max_content_width": 120}, diff --git a/cratedb_toolkit/util/data.py b/cratedb_toolkit/util/data.py index 3a4d67d1..62cdbb33 100644 --- a/cratedb_toolkit/util/data.py +++ b/cratedb_toolkit/util/data.py @@ -1,3 +1,4 @@ +import datetime as dt import json import sys import typing as t @@ -7,7 +8,7 @@ def jd(data: t.Any): """ Pretty-print JSON with indentation. """ - print(json.dumps(data, indent=2), file=sys.stdout) # noqa: T201 + print(json.dumps(data, indent=2, cls=JSONEncoderPlus), file=sys.stdout) # noqa: T201 def str_contains(haystack, *needles): @@ -16,3 +17,15 @@ def str_contains(haystack, *needles): """ haystack = str(haystack) return any(needle in haystack for needle in needles) + + +class JSONEncoderPlus(json.JSONEncoder): + """ + https://stackoverflow.com/a/27058505 + """ + + def default(self, o): + if isinstance(o, dt.datetime): + return o.isoformat() + + return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/util/platform.py b/cratedb_toolkit/util/platform.py new file mode 100644 index 00000000..0c588dfd --- /dev/null +++ b/cratedb_toolkit/util/platform.py @@ -0,0 +1,56 @@ +import io +import json +from contextlib import redirect_stdout + + +class PlatformInfo: + @staticmethod + def application(): + import platform + + from cratedb_toolkit import __appname__, __version__ + + data = {} + + data["platform"] = platform.platform() + data["version"] = __version__ + data["name"] = __appname__ + return data + + @staticmethod + def libraries(): + data = {} + + # SQLAlchemy + from importlib.metadata import entry_points + + try: + import sqlalchemy.dialects.plugins + import sqlalchemy.dialects.registry + + data["sqlalchemy"] = { + "dialects_builtin": list(sqlalchemy.dialects.registry.impls.keys()), + "dialects_3rdparty": [dialect.name for dialect in entry_points(group="sqlalchemy.dialects")], # type: ignore[attr-defined,call-arg] + "plugins": list(sqlalchemy.dialects.plugins.impls.keys()), + } + except Exception: # noqa: S110 + pass + + # pandas + try: + import pandas + + buffer = io.StringIO() + with redirect_stdout(buffer): + pandas.show_versions(as_json=True) + buffer.seek(0) + data["pandas"] = json.load(buffer) + except Exception: # noqa: S110 + pass + + # fsspec + import fsspec + + data["fsspec"] = {"protocols": fsspec.available_protocols(), "compressions": fsspec.available_compressions()} + + return data diff --git a/cratedb_toolkit/util/service.py b/cratedb_toolkit/util/service.py new file mode 100644 index 00000000..a3a4a4ba --- /dev/null +++ b/cratedb_toolkit/util/service.py @@ -0,0 +1,24 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging + +import typting as t + +from cratedb_toolkit.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +def start_service(app: str, listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + setup_logging() + from uvicorn import run + + if listen_address is None: + listen_address = "127.0.0.1:4242" + + host, port = listen_address.split(":") + port_int = int(port) + + logger.info(f"Starting HTTP web service on http://{listen_address}") + + run(app=app, host=host, port=port_int, reload=reload) diff --git a/cratedb_toolkit/wtf/README.md b/cratedb_toolkit/wtf/README.md new file mode 100644 index 00000000..1632171f --- /dev/null +++ b/cratedb_toolkit/wtf/README.md @@ -0,0 +1,47 @@ +# cratedb-wtf + +A diagnostics utility in the spirit of [git-wtf], [grafana-wtf], and [pip.wtf]. +It is still a work-in-progress, but it is usable already. + + +## Synopsis + +Define CrateDB database cluster address. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +``` + +Display system and database cluster information. +```shell +cratedb-wtf info +``` + +Display database cluster log messages. +```shell +cratedb-wtf logs +``` + +Statistics. +```shell +cratedb-wtf statistics quick +cratedb-wtf statistics collect +cratedb-wtf statistics view +``` + + +## HTTP API + +Expose collected status information. +```shell +cratedb-wtf --debug serve --reload +``` +Consume collected status information via HTTP. +```shell +http http://127.0.0.1:4242/info/all +``` + + + +[git-wtf]: http://thrawn01.org/posts/2014/03/03/git-wtf/ +[grafana-wtf]: https://github.com/panodata/grafana-wtf +[pip.wtf]: https://github.com/sabslikesobs/pip.wtf diff --git a/cratedb_toolkit/wtf/__init__.py b/cratedb_toolkit/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/wtf/backlog.md b/cratedb_toolkit/wtf/backlog.md new file mode 100644 index 00000000..5a7f38a4 --- /dev/null +++ b/cratedb_toolkit/wtf/backlog.md @@ -0,0 +1,35 @@ +# cratedb-wtf backlog + +## Iteration +1 +- Complete collected queries and code snippets +- Harvest queries from Admin UI, crash, crate-admin-alt +- Harvest queries from experts + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production + - https://tools.cr8.net/grafana/d/RkpNJx84z/cratedb-jobs-log?orgId=1&refresh=5m&var-datasource=crate-production&viewPanel=44 +- Add `description` field to each `InfoElement` + +## Iteration +2 +- Make `cratedb-wtf logs` also optionally consider `sys.` tables. +- cratedb-wtf explore table|shard|partition|node +- High-level analysis, evaluating a set of threshold rules +- Network diagnostics? +- Expose collected data via Glances-like UI + +## Notes +``` +ctk cluster info +ctk cluster health +ctk cluster logs --slow-queries +``` + +Acknowledgements: @baur, @hammerhea, @walbeh. + + +## Done +- Make it work +- Proper marshalling of timestamp values (ISO 8601) +- Expose collected data via HTTP API + ``` + cratedb-wtf serve + ``` +- Provide `scrub` option also via HTTP diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py new file mode 100644 index 00000000..13bf1399 --- /dev/null +++ b/cratedb_toolkit/wtf/cli.py @@ -0,0 +1,188 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import sys +import typing as t +import urllib.parse + +import click +from click_aliases import ClickAliasedGroup + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import ( + boot_click, + make_command, +) +from cratedb_toolkit.util.data import jd +from cratedb_toolkit.wtf.core import InfoContainer, LogContainer, StatisticsContainer + +logger = logging.getLogger(__name__) + + +def help_info(): + """ + Database cluster and system information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf info + + """ # noqa: E501 + + +def help_logs(): + """ + Database cluster logs. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf logs + + """ # noqa: E501 + + +def help_statistics(): + """ + Database cluster query statistics. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf statistics quick + cratedb-wtf statistics collect + cratedb-wtf statistics view + + """ # noqa: E501 + + +def help_serve(): + """ + Start HTTP service to expose collected information. + + Synopsis + ======== + + export CRATEDB_SQLALCHEMY_URL=crate://localhost/ + cratedb-wtf serve + + """ # noqa: E501 + + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@cratedb_sqlalchemy_option +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") +@click.version_option() +@click.pass_context +def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): + """ + Diagnostics and informational utilities. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + return boot_click(ctx, verbose, debug) + + +@make_command(cli, "info", help_info) +@click.pass_context +def info(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = InfoContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(cli, "logs", help_logs) +@click.pass_context +def logs(ctx: click.Context): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = LogContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@click.pass_context +def statistics(ctx: click.Context): + """ + Collect and display query statistics. + """ + pass + + +cli.add_command(statistics, name="statistics", aliases=["stats"]) + + +@make_command(statistics, "quick", "Display ad hoc statistics.") +@click.pass_context +def statistics_quick(ctx: click.Context): + """ + Display ad hoc statistics. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + scrub = ctx.meta.get("scrub", False) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + sample = StatisticsContainer(adapter=adapter, scrub=scrub) + jd(sample.to_dict()) + + +@make_command(statistics, "collect", "Collect queries from sys.jobs_log.") +@click.pass_context +def statistics_collect(ctx: click.Context): + """ + Run jobs_log collector. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.main() + + +@make_command(statistics, "view", "View statistics about collected queries from sys.jobs_log.") +@click.pass_context +def statistics_view(ctx: click.Context): + """ + View statistics about collected queries from sys.jobs_log. + + # TODO: Forward `cratedb_sqlalchemy_url` properly. + """ + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + url = urllib.parse.urlparse(cratedb_sqlalchemy_url) + hostname = f"{url.hostname}:{url.port}" + os.environ["HOSTNAME"] = hostname + + import cratedb_toolkit.wtf.query_collector + + cratedb_toolkit.wtf.query_collector.init() + + response: t.Dict = {"meta": {}, "data": {}} + response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change." + response["data"]["stats"] = cratedb_toolkit.wtf.query_collector.read_stats() + jd(response) + + +@make_command(cli, "serve", help_serve) +@click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address") +@click.option("--reload", is_flag=True, help="Dynamically reload changed files") +@click.pass_context +def serve(ctx: click.Context, listen: str, reload: bool): + from cratedb_toolkit.wtf.http import start + + start(listen, reload=reload) diff --git a/cratedb_toolkit/wtf/core.py b/cratedb_toolkit/wtf/core.py new file mode 100644 index 00000000..a8cca4ba --- /dev/null +++ b/cratedb_toolkit/wtf/core.py @@ -0,0 +1,69 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import functools as ft + +import boltons.ecoutils +from boltons.iterutils import get_path + +from cratedb_toolkit.util.platform import PlatformInfo +from cratedb_toolkit.wtf.library import Library +from cratedb_toolkit.wtf.model import InfoContainerBase, InfoElement, LogElement + + +class InfoContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + InfoElement( + name="cluster_name", + label="Cluster name", + sql=Library.Health.cluster_name, + transform=ft.partial(get_path, path=(0, "name")), + ), + InfoElement( + name="cluster_nodes_count", + label="Total number of cluster nodes", + sql=Library.Health.nodes_count, + transform=ft.partial(get_path, path=(0, "count")), + ), + InfoElement(name="cluster_nodes_list", label="Cluster Nodes", sql=Library.Health.nodes_list), + InfoElement(name="table_allocations", label="Table Allocations", sql=Library.Health.table_allocations), + InfoElement(name="table_health", label="Table Health", sql=Library.Health.table_health), + ) + + def to_dict(self, data=None): + return super().to_dict(data={"system": self.system(), "database": self.database()}) + + def system(self): + data = {} + data["remark"] = ( + "This section includes system information about the machine running CrateDB " + 'Toolkit, effectively about the "compute" domain.' + ) + data["application"] = PlatformInfo.application() + data["eco"] = boltons.ecoutils.get_profile(scrub=self.scrub) + data["libraries"] = PlatformInfo.libraries() + return data + + def database(self): + data = {} + data["remark"] = ( + "This section includes system and other diagnostics information about the CrateDB " + 'database cluster, effectively about the "storage" domain.' + ) + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + +class LogContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + LogElement(name="queries_users_latest", label="Latest User Queries", sql=Library.Logs.user_queries), + ) + + +class StatisticsContainer(InfoContainerBase): + def register_builtins(self): + self.elements.add( + InfoElement(name="quickstats", label="Query usage statistics", sql=Library.Statistics.quickstats), + ) diff --git a/cratedb_toolkit/wtf/http.py b/cratedb_toolkit/wtf/http.py new file mode 100644 index 00000000..e12c5281 --- /dev/null +++ b/cratedb_toolkit/wtf/http.py @@ -0,0 +1,41 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import os +import typing as t +from functools import lru_cache + +from fastapi import Depends, FastAPI, HTTPException + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.service import start_service +from cratedb_toolkit.wtf.core import InfoContainer +from cratedb_toolkit.wtf.util import get_baseinfo + +logger = logging.getLogger(__name__) + +app = FastAPI() + + +@lru_cache +def database_adapter() -> DatabaseAdapter: + # TODO: return config.Settings() + cratedb_sqlalchemy_url = os.environ["CRATEDB_SQLALCHEMY_URL"] + return DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + + +@app.get("/") +def read_root(): + return get_baseinfo() + + +@app.get("/info/{category}") +def info(category: str, adapter: t.Annotated[DatabaseAdapter, Depends(database_adapter)], scrub: bool = False): + if category != "all": + raise HTTPException(status_code=404, detail="Info category not found") + sample = InfoContainer(adapter=adapter, scrub=scrub) + return sample.to_dict() + + +def start(listen_address: t.Union[str, None] = None, reload: bool = False): # pragma: no cover + start_service(app="cratedb_toolkit.wtf.http:app") diff --git a/cratedb_toolkit/wtf/library.py b/cratedb_toolkit/wtf/library.py new file mode 100644 index 00000000..2159ba1b --- /dev/null +++ b/cratedb_toolkit/wtf/library.py @@ -0,0 +1,133 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. + + +class Library: + """ + A collection of SQL queries and utilities suitable for diagnostics on CrateDB. + """ + + class Health: + """ + CrateDB health check queries. + + References: + + - https://community.cratedb.com/t/similar-elasticsearch-commands/1455/4 + Thanks, Eduardo Legatti. + """ + + cluster_name = """ + SELECT name FROM sys.cluster; + """ + + table_allocations = """ + SELECT table_schema, table_name, node_id, shard_id, partition_ident, current_state, decisions, explanation + FROM sys.allocations; + """ + + table_allocations_special = """ + SELECT decisions[2]['node_name'] AS node_name, COUNT(*) AS table_count + FROM sys.allocations + GROUP BY decisions[2]['node_name']; + """ + + table_health = """ + SELECT health, COUNT(*) AS table_count FROM sys.health GROUP BY health; + """ + + nodes_count = """ + SELECT COUNT(id) AS count FROM sys.nodes; + """ + nodes_list = """ + SELECT * FROM sys.nodes ORDER BY hostname; + """ + + class Logs: + # TODO: Implement `tail` in one way or another. -- https://stackoverflow.com/q/4714975 + # SELECT * FROM sys.jobs_log OFFSET -10; + # SELECT * FROM sys.jobs_log OFFSET (SELECT count(*) FROM sys.jobs_log)-10; + # https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#to-char-expression-format-string + # https://cratedb.com/docs/crate/reference/en/latest/general/builtins/scalar-functions.html#date-format-format-string-timezone-timestamp + user_queries = """ + SELECT + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', started) AS started, + DATE_FORMAT('%Y-%m-%dT%H:%i:%s.%f', ended) AS ended, + classification, stmt, username, node + FROM + sys.jobs_log + WHERE + stmt NOT LIKE '%sys.%' AND + stmt NOT LIKE '%information_schema.%' + """ + + latest_user_queries = ( + user_queries + + """ + ORDER BY ended DESC + LIMIT {limit}; + """ + ) + + class Replication: + # https://github.com/crate/crate/blob/master/docs/admin/logical-replication.rst#monitoring + subscriptions = """ + SELECT s.subname, s.subpublications, sr.srrelid::text, sr.srsubstate, sr.srsubstate_reason + FROM pg_subscription s + JOIN pg_subscription_rel sr ON s.oid = sr.srsubid + ORDER BY s.subname; + """ + + class Resources: + tcp_connections = """ + SELECT connections FROM sys.nodes LIMIT 1; + """ + + thread_pools = """ + SELECT thread_pools[14]['queue'], + thread_pools[14]['active'], + thread_pools[14]['threads'] + FROM sys.nodes LIMIT 100; + """ + + class Shards: + # data-hot-2 262 + # data-hot-1 146 + info = """ + SELECT node['name'], COUNT(*) + FROM sys.shards + WHERE primary = true + GROUP BY 1; + """ + + # https://cratedb.com/docs/crate/reference/en/latest/admin/system-information.html#example + for_table = """ + SELECT + schema_name, + table_name, + id, + partition_ident, + num_docs, + primary, + relocating_node, + routing_state, + state, + orphan_partition + FROM sys.shards + WHERE schema_name = '{schema_name}' AND table_name = '{table_name}'; + """ + + class Statistics: + quickstats = """ + SELECT + stmt, + COUNT(stmt) AS stmt_count, + MAX((ended::long - started::long) ) AS max_duration, + MIN((ended::long - started::long) ) AS min_duration, + avg((ended::long - started::long) ) AS avg_duration, + PERCENTILE((ended::long - started::long), 0.99) as p90 + FROM sys.jobs_log + GROUP BY stmt + ORDER BY stmt_count DESC + LIMIT 1000; + """ diff --git a/cratedb_toolkit/wtf/model.py b/cratedb_toolkit/wtf/model.py new file mode 100644 index 00000000..a71a7ac6 --- /dev/null +++ b/cratedb_toolkit/wtf/model.py @@ -0,0 +1,84 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import dataclasses +import typing as t +from abc import abstractmethod + +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.wtf.util import get_baseinfo + + +@dataclasses.dataclass +class InfoElement: + name: str + label: str + sql: str + transform: t.Union[t.Callable, None] = None + + def to_dict(self): + data = dataclasses.asdict(self) + data["sql"] = data["sql"].strip() + data["transform"] = str(data["transform"]) + return data + + +@dataclasses.dataclass +class LogElement(InfoElement): + limit: int = 100 + + +@dataclasses.dataclass +class ElementStore: + items: t.List[InfoElement] = dataclasses.field(default_factory=list) + index: t.Dict[str, InfoElement] = dataclasses.field(default_factory=dict) + + def add(self, *elements: InfoElement): + for element in elements: + self.items.append(element) + if element.name in self.index: + raise KeyError(f"Duplicate key/label: {element.name}") + self.index[element.name] = element + + +class InfoContainerBase: + def __init__(self, adapter: DatabaseAdapter, scrub: bool = False): + self.adapter = adapter + self.scrub = scrub + self.elements = ElementStore() + self.register_builtins() + + @abstractmethod + def register_builtins(self): + raise NotImplementedError("Method needs to be implemented by child class") + + def metadata(self): + data = {} + data.update(get_baseinfo()) + data["elements"] = {} + for element in self.elements.items: + data["elements"][element.name] = element.to_dict() + return data + + def evaluate_element(self, element: InfoElement): + sql = element.sql + if isinstance(element, LogElement): + sql = sql.format(limit=element.limit) + results = self.adapter.run_sql(sql, records=True) + if element.transform is not None: + results = element.transform(results) + return results + + def to_dict(self, data=None): + if data is None: + data = self.render() + return {"meta": self.metadata(), "data": data} + + def render(self): + data = {} + for element in self.elements.items: + data[element.name] = self.evaluate_element(element) + return data + + # FIXME + def by_table(self, schema: str, table: str): + pass diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py new file mode 100644 index 00000000..d857f3ec --- /dev/null +++ b/cratedb_toolkit/wtf/query_collector.py @@ -0,0 +1,247 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. + +# ruff: noqa: S608 +import json +import logging +import os +import time +from uuid import uuid4 + +import urllib3 +from crate import client + +logger = logging.getLogger(__name__) + +host = os.getenv("HOSTNAME", "localhost:4200") +username = os.getenv("USERNAME", "crate") +password = os.getenv("PASSWORD", "") +interval = float(os.getenv("INTERVAL", 10)) +stmt_log_table = os.getenv("STMT_TABLE", "stats.statement_log") +last_exec_table = os.getenv("LAST_EXEC_TABLE", "stats.last_execution") +last_execution_ts = 0 +sys_jobs_log = {} +bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000] +bucket_dict = { + "10": 0, + "50": 0, + "100": 0, + "500": 0, + "1000": 0, + "2000": 0, + "5000": 0, + "10000": 0, + "15000": 0, + "20000": 0, + "INF": 0, +} + + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +conn = client.connect(host, username=username, password=password) +cursor = conn.cursor() +last_scrape = int(time.time() * 1000) - (interval * 60000) + +TRACING = False + + +def init(): + stmt = ( + f"CREATE TABLE IF NOT EXISTS {stmt_log_table} " + f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, " + f"username TEXT, query_type TEXT, avg_duration FLOAT, nodes ARRAY(TEXT))" + ) + cursor.execute(stmt) + stmt = f"SELECT id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used FROM {stmt_log_table}" + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + stmt = f"CREATE TABLE IF NOT EXISTS {last_exec_table} (last_execution TIMESTAMP)" + cursor.execute(stmt) + stmt = f"SELECT last_execution FROM {last_exec_table}" + cursor.execute(stmt) + init_last_execution(cursor.fetchall()) + + +def init_last_execution(last_execution): + global last_execution_ts + if len(last_execution) == 0: + last_execution_ts = 0 + stmt = f"INSERT INTO {last_exec_table} (last_execution) VALUES (?)" + cursor.execute(stmt, (0,)) + else: + last_execution_ts = last_execution[0][0] + + +def init_stmts(stmts): + for stmt in stmts: + stmt_id = stmt[0] + stmt_column = stmt[1] + calls = stmt[2] + bucket = stmt[3] + user = stmt[4] + stmt_type = stmt[5] + avg_duration = stmt[6] + nodes = stmt[7] + last_used = stmt[8] + + if stmt_column not in sys_jobs_log: + sys_jobs_log[stmt_column] = { + "id": stmt_id, + "size": 0, + "info": [], + "calls": calls, + "bucket": bucket, + "user": user, + "type": stmt_type, + "avg_duration": avg_duration, + "nodes": nodes, + "last_used": last_used, + "in_db": True, + "changed": False, + } + + +def write_stats_to_db(): + logger.info("Writing statistics to database") + write_query_stmt = ( + f"INSERT INTO {stmt_log_table} " + f"(id, stmt, calls, bucket, username, query_type, avg_duration, nodes, last_used) " + f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + update_query_stmt = ( + f"UPDATE {stmt_log_table} " + f"SET calls = ?, avg_duration = ?, nodes = ?, bucket = ?, last_used = ? " + f"WHERE id = ?" + ) + write_params = [] + for key in sys_jobs_log.keys(): + if not sys_jobs_log[key]["in_db"]: + write_params.append( + [ + sys_jobs_log[key]["id"], + key, + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["user"], + sys_jobs_log[key]["type"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["last_used"], + ] + ) + sys_jobs_log[key]["in_db"] = True + sys_jobs_log[key]["changed"] = False + elif sys_jobs_log[key]["changed"]: + cursor.execute( + update_query_stmt, + ( + sys_jobs_log[key]["calls"], + sys_jobs_log[key]["avg_duration"], + sys_jobs_log[key]["nodes"], + sys_jobs_log[key]["bucket"], + sys_jobs_log[key]["last_used"], + sys_jobs_log[key]["id"], + ), + ) + sys_jobs_log[key]["changed"] = False + if len(write_params) > 0: + cursor.executemany(write_query_stmt, write_params) + + stmt = f"UPDATE {last_exec_table} SET last_execution = ?" + cursor.execute(stmt, (last_scrape,)) + + +def read_stats(): + stmt = ( + f"SELECT id, stmt, calls, avg_duration, bucket, username, query_type, nodes, last_used " + f"FROM {stmt_log_table} ORDER BY calls, avg_duration;" + ) + cursor.execute(stmt) + init_stmts(cursor.fetchall()) + return sys_jobs_log + + +def assign_to_bucket(bucket, duration): + found = False + for element in bucket_list: + if duration < element: + found = True + bucket[str(element)] += 1 + break + if not found: + bucket["INF"] += 1 + + return bucket + + +def update_statistics(query_results): + global sys_jobs_log + for result in query_results: + started = result[0] + ended = result[1] + classification = result[2] + stmt = result[3] + user = result[4] + node = json.dumps(result[5]) + + duration = ended - started + if stmt not in sys_jobs_log: + sys_jobs_log[stmt] = { + "id": str(uuid4()), + "calls": 0, + "bucket": dict(bucket_dict), + "user": user, + "type": classification["type"], + "avg_duration": duration, + "in_db": False, + "last_used": started, + "nodes": [], + "changed": True, + } + sys_jobs_log[stmt]["changed"] = True + sys_jobs_log[stmt]["avg_duration"] = (sys_jobs_log[stmt]["avg_duration"] + duration) / 2 + sys_jobs_log[stmt]["bucket"] = assign_to_bucket(sys_jobs_log[stmt]["bucket"], duration) + sys_jobs_log[stmt]["last_used"] = started + sys_jobs_log[stmt]["calls"] += 1 + sys_jobs_log[stmt]["nodes"].append(node) + sys_jobs_log[stmt]["nodes"] = list(set(sys_jobs_log[stmt]["nodes"])) # only save unique nodes + if TRACING: + logger.info(f"Updated statistics: {sys_jobs_log}") + + +def scrape_db(): + global last_scrape + logger.info("Reading sys.jobs_log") + next_scrape = int(time.time() * 1000) + stmt = ( + f"SELECT " + f"started, ended, classification, stmt, username, node " + f"FROM sys.jobs_log " + f"WHERE " + f"stmt NOT LIKE '%sys.%' AND " + f"stmt NOT LIKE '%information_schema.%' " + f"AND ended BETWEEN {last_scrape} AND {next_scrape} " + f"ORDER BY ended DESC" + ) + + cursor.execute(stmt) + result = cursor.fetchall() + update_statistics(result) + last_scrape = next_scrape + + +def run(): + scrape_db() + write_stats_to_db() + + +def main(): + init() + while True: + run() + logger.info(f"Sleeping for {interval} seconds") + time.sleep(interval) + + +if __name__ == "__main__": + main() diff --git a/cratedb_toolkit/wtf/util.py b/cratedb_toolkit/wtf/util.py new file mode 100644 index 00000000..8db76e2b --- /dev/null +++ b/cratedb_toolkit/wtf/util.py @@ -0,0 +1,14 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import datetime as dt +import typing as t + +from cratedb_toolkit import __appname__, __version__ + + +def get_baseinfo(): + data: t.Dict[str, t.Union[str, dt.datetime]] = {} + data["system_time"] = dt.datetime.now() + data["application_name"] = __appname__ + data["application_version"] = __version__ + return data diff --git a/pyproject.toml b/pyproject.toml index 02d4f5aa..744e7af8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,10 +91,12 @@ dependencies = [ "crash", "crate[sqlalchemy]>=0.34", "croud==1.8", + "fastapi<0.105", 'importlib-metadata; python_version <= "3.7"', "python-dotenv<2", "sqlalchemy>=2", "sqlparse<0.5", + "uvicorn<0.25", ] [project.optional-dependencies] all = [ @@ -146,6 +148,7 @@ repository = "https://github.com/crate-workbench/cratedb-toolkit" [project.scripts] cratedb-retention = "cratedb_toolkit.retention.cli:cli" cratedb-toolkit = "cratedb_toolkit.cli:cli" +cratedb-wtf = "cratedb_toolkit.wtf.cli:cli" ctk = "cratedb_toolkit.cli:cli" migr8 = "cratedb_toolkit.io.mongodb.cli:main" diff --git a/tests/retention/test_cli.py b/tests/retention/test_cli.py index b100e50a..01b7487c 100644 --- a/tests/retention/test_cli.py +++ b/tests/retention/test_cli.py @@ -61,7 +61,7 @@ def test_setup_verbose(caplog, cratedb, settings): assert result.exit_code == 0 assert cratedb.database.table_exists(settings.policy_table.fullname) is True - assert 3 <= len(caplog.records) <= 10 + assert 3 <= len(caplog.records) <= 15 def test_setup_dryrun(caplog, cratedb, settings): diff --git a/tests/wtf/__init__.py b/tests/wtf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/wtf/test_cli.py b/tests/wtf/test_cli.py new file mode 100644 index 00000000..59d4d89a --- /dev/null +++ b/tests/wtf/test_cli.py @@ -0,0 +1,101 @@ +import json + +from boltons.iterutils import get_path +from click.testing import CliRunner + +from cratedb_toolkit.wtf.cli import cli + + +def test_wtf_cli_info(cratedb): + """ + Verify `cratedb-wtf info`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="info", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + system_keys = list(get_path(info, ("data", "system")).keys()) + database_keys = list(get_path(info, ("data", "database")).keys()) + assert system_keys == ["remark", "application", "eco", "libraries"] + assert "cluster_name" in database_keys + assert "cluster_nodes_count" in database_keys + + +def test_wtf_cli_logs(cratedb): + """ + Verify `cratedb-wtf logs`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="logs", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "queries_users_latest" in data_keys + + +def test_wtf_cli_statistics_quick(cratedb): + """ + Verify `cratedb-wtf statistics quick`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="statistics quick", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "quickstats" in data_keys + + +def test_wtf_cli_statistics_view(cratedb): + """ + Verify `cratedb-wtf statistics view`. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + result = runner.invoke( + cli, + args="statistics view", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify outcome. + info = json.loads(result.output) + assert "meta" in info + assert "data" in info + + data_keys = list(info["data"].keys()) + assert "stats" in data_keys