Skip to content

Commit

Permalink
wtf: Add recorder for outcomes of info and job-info
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Apr 17, 2024
1 parent fc1160b commit 95e2c23
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 7 deletions.
7 changes: 3 additions & 4 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, Crate.io Inc.
# Copyright (c) 2023-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import io
import os
Expand Down Expand Up @@ -28,10 +28,9 @@ class DatabaseAdapter:
Wrap SQLAlchemy connection to database.
"""

def __init__(self, dburi: str):
def __init__(self, dburi: str, echo: bool = False):
self.dburi = dburi
# TODO: Make `echo=True` configurable.
self.engine = sa.create_engine(self.dburi, echo=False)
self.engine = sa.create_engine(self.dburi, echo=echo)
self.connection = self.engine.connect()

def run_sql(self, sql: t.Union[str, Path, io.IOBase], records: bool = False, ignore: str = None):
Expand Down
7 changes: 6 additions & 1 deletion cratedb_toolkit/wtf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ Display database cluster log messages.
cratedb-wtf logs
```

Statistics.
Collect and display job statistics.
```shell
cratedb-wtf job-statistics collect
cratedb-wtf job-statistics view
```

Record complete outcomes of `info` and `job-info`.
```shell
cratedb-wtf record
```


## HTTP API

Expand Down
13 changes: 12 additions & 1 deletion cratedb_toolkit/wtf/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, Crate.io Inc.
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import os
Expand All @@ -16,6 +16,7 @@
)
from cratedb_toolkit.util.data import jd
from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer, LogContainer
from cratedb_toolkit.wtf.recorder import InfoRecorder

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -178,6 +179,16 @@ def job_statistics_view(ctx: click.Context):
jd(response)


@make_command(cli, "record", "Record `info` and `job-info` outcomes.")
@click.pass_context
def record(ctx: click.Context):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
scrub = ctx.meta.get("scrub", False)
adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url, echo=False)
recorder = InfoRecorder(adapter=adapter, scrub=scrub)
recorder.record_forever()


@make_command(cli, "serve", help_serve)
@click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address")
@click.option("--reload", is_flag=True, help="Dynamically reload changed files")
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/wtf/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import typing as t

import boltons.ecoutils

from cratedb_toolkit.util.platform import PlatformInfo
Expand Down Expand Up @@ -35,13 +37,15 @@ def to_dict(self, data=None):
return super().to_dict(data={"system": self.system(), "database": self.database()})

def system(self):
data = {}
data: t.Dict[str, t.Any] = {}
data["remark"] = (
"This section includes system information about the machine running CrateDB "
'Toolkit, effectively about the "compute" domain.'
)
data["application"] = PlatformInfo.application()
data["eco"] = boltons.ecoutils.get_profile(scrub=self.scrub)
# `version_info` is a list of mixed data types: [3, 11, 6, "final", 0].
data["eco"]["python"]["version_info"] = str(data["eco"]["python"]["version_info"])
# data["libraries"] = PlatformInfo.libraries() # noqa: ERA001
return data

Expand Down
57 changes: 57 additions & 0 deletions cratedb_toolkit/wtf/recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2023-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import threading
import time

import sqlalchemy as sa

from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.wtf.core import InfoContainer, JobInfoContainer

logger = logging.getLogger(__name__)


class InfoRecorder:
"""
Record complete outcomes of `info` and `job-info`.
"""

clusterinfo_table = "ext.clusterinfo"
jobinfo_table = "ext.jobinfo"
interval_seconds = 10

def __init__(self, adapter: DatabaseAdapter, scrub: bool = False):
self.adapter = adapter
self.scrub = scrub

def record_once(self):
logger.info("Recording information snapshot")
clusterinfo_sample = InfoContainer(adapter=self.adapter, scrub=self.scrub)
jobinfo_sample = JobInfoContainer(adapter=self.adapter, scrub=self.scrub)

for table, sample in ((self.clusterinfo_table, clusterinfo_sample), (self.jobinfo_table, jobinfo_sample)):
self.adapter.connection.execute(
sa.text(
f"""
CREATE TABLE IF NOT EXISTS {table}
(time TIMESTAMP DEFAULT NOW(), info OBJECT)
"""
)
)
self.adapter.connection.execute(
sa.text(f"INSERT INTO {table} (info) VALUES (:info)"), {"info": sample.to_dict()["data"]} # noqa: S608
)

def record_forever(self):
logger.info(f"Starting to record information snapshot each {self.interval_seconds} seconds")
thread = threading.Thread(target=self.do_record_forever)
thread.start()

def do_record_forever(self):
while True:
try:
self.record_once()
except Exception:
logger.exception("Failed to record information snapshot")
time.sleep(self.interval_seconds)

0 comments on commit 95e2c23

Please sign in to comment.