diff --git a/backfill.py b/backfill.py index b7e44e68..ff8ccd38 100644 --- a/backfill.py +++ b/backfill.py @@ -6,9 +6,9 @@ import structlog -from metrics import influxdb from metrics.github.prs import process_prs from metrics.logs import setup_logging +from metrics.timescaledb import TimescaleDBWriter from metrics.tools.dates import date_from_iso, datetime_from_iso, iter_days @@ -17,9 +17,6 @@ log = structlog.get_logger() -writer = influxdb.write - - def get_data(db, orgs): subprocess.check_call(["github-to-sqlite", "repos", db, *orgs]) @@ -83,7 +80,8 @@ def pr_queue(prs, org, start, days_threshold=None): key = f"queue{suffix}" log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_on_day)) - process_prs(writer, key, prs_on_day, day) + with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer: + process_prs(writer, prs_on_day, day) def pr_throughput(prs, org, start): @@ -119,7 +117,8 @@ def next_weekday(d, weekday): key = "throughput" log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_in_range)) - process_prs(writer, key, prs_in_range, day) + with TimescaleDBWriter("github_pull_requests", "throughput") as writer: + process_prs(writer, prs_in_range, day) if __name__ == "__main__": diff --git a/docker-compose.yaml b/docker-compose.yaml index d6e4e270..bffd2090 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -21,25 +21,12 @@ services: GF_DATABASE_SSL_MODE: disable depends_on: - db - - influxdb + - timescaledb ports: - 3000:3000 volumes: - grafana:/var/lib/grafana - influxdb: - image: influxdb:latest - environment: - DOCKER_INFLUXDB_INIT_MODE: setup - DOCKER_INFLUXDB_INIT_USERNAME: admin - DOCKER_INFLUXDB_INIT_PASSWORD: admin - DOCKER_INFLUXDB_INIT_ORG: bennett - DOCKER_INFLUXDB_INIT_BUCKET: data - ports: - - 8086:8086 - volumes: - - influxdb:/var/lib/influxdb2 - timescaledb: image: timescale/timescaledb-ha:pg14-latest environment: @@ -52,5 +39,4 @@ services: volumes: postgres: grafana: - influxdb: timescaledb: diff --git a/metrics/cli.py b/metrics/cli.py index 789c49be..92e98d84 100644 --- a/metrics/cli.py +++ b/metrics/cli.py @@ -7,13 +7,15 @@ @click.group() @click.option("--debug", default=False, is_flag=True) +@click.option("--database-url", required=True, envvar="DATABASE_URL") @click.pass_context -def cli(ctx, debug): +def cli(ctx, debug, database_url): ctx.ensure_object(dict) setup_logging(debug) ctx.obj["DEBUG"] = debug + ctx.obj["DATABASE_URL"] = database_url cli.add_command(github) diff --git a/metrics/github/cli.py b/metrics/github/cli.py index e55bdc1e..bdd8c4c6 100644 --- a/metrics/github/cli.py +++ b/metrics/github/cli.py @@ -3,13 +3,12 @@ import click import structlog -from .. import influxdb +from ..timescaledb import TimescaleDBWriter from . import api from .prs import process_prs log = structlog.get_logger() -writer = influxdb.write @click.group() @@ -42,7 +41,8 @@ def pr_queue(ctx, org, date, days_threshold): suffix = f"_older_than_{days_threshold}_days" if days_threshold else "" log.info("%s | %s | Processing %s PRs", date, org, len(prs)) - process_prs(writer, f"queue{suffix}", prs, date) + with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer: + process_prs(writer, prs, date) @github.command() @@ -58,4 +58,5 @@ def pr_throughput(ctx, org, date, days): prs = api.prs_opened_in_the_last_N_days(org, start, end) log.info("%s | %s | Processing %s PRs", date, org, len(prs)) - process_prs(writer, "throughput", prs, date) + with TimescaleDBWriter("github_pull_requests", "throughput") as writer: + process_prs(writer, prs, date) diff --git a/metrics/github/prs.py b/metrics/github/prs.py index 7ace17a6..421ddc30 100644 --- a/metrics/github/prs.py +++ b/metrics/github/prs.py @@ -1,4 +1,4 @@ -def process_prs(writer, key, prs, date): +def process_prs(writer, prs, date): """ Given a list of PRs, break them down in series for writing @@ -20,13 +20,10 @@ def process_prs(writer, key, prs, date): org = list(orgs)[0] - writer( - f"github_pull_requests_{key}", + writer.write( date, len(prs_by_author_and_repo), - tags={ - "author": author, - "organisation": org, - "repo": repo, - }, + author=author, + organisation=org, + repo=repo, ) diff --git a/metrics/influxdb.py b/metrics/influxdb.py deleted file mode 100644 index ce4b34d5..00000000 --- a/metrics/influxdb.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -from datetime import UTC, datetime, time - -import influxdb_client -import structlog -from influxdb_client import Point -from influxdb_client.client.write_api import WriteOptions, WriteType - - -log = structlog.get_logger() - -TOKEN = os.environ["INFLUXDB_TOKEN"] -BUCKET = "data" -ORG = "bennett" -URL = "http://localhost:8086" - -client = influxdb_client.InfluxDBClient(url=URL, token=TOKEN, org=ORG) -delete_api = client.delete_api() -# write_api = client.write_api(write_options=SYNCHRONOUS) -write_api = client.write_api( - write_options=WriteOptions( - write_type=WriteType.synchronous, - batch_size=1000, - ) -) - - -def delete(key): - measurement = f"_measurement={key}" - log.info("Removing %s", key) - - start = "1970-01-01T00:00:00Z" - stop = datetime.now(tz=UTC).isoformat(timespec="seconds") - - delete_api.delete(start, stop, measurement, bucket=BUCKET, org=ORG) - - -def write(measurement, date, value, tags=None): - if tags is None: - tags = {} - - # convert date to a timestamp - # TODO: do we need to do any checking to make sure this is tz-aware and in - # UTC? - dt = datetime.combine(date, time()) - - point = Point(measurement).field("number", value).time(dt) - - for k, v in tags.items(): - point = point.tag(k, v) - - write_api.write(bucket=BUCKET, org=ORG, record=point) - - log.debug( - measurement, - date=dt.isoformat(), - number=value, - **tags, - ) - - -if __name__ == "__main__": - print(datetime.now()) - point = Point("github.pull_requests").field("testing", 79).time(datetime.now()) - write_api.write(bucket=BUCKET, org=ORG, record=point) diff --git a/metrics/slack/cli.py b/metrics/slack/cli.py index d4467546..c0b7481a 100644 --- a/metrics/slack/cli.py +++ b/metrics/slack/cli.py @@ -3,13 +3,10 @@ import click -from .. import influxdb +from ..timescaledb import TimescaleDBWriter from .api import get_app, iter_messages -writer = influxdb.write - - @click.group() @click.option("--signing-secret", required=True, envvar="SLACK_SIGNING_SECRET") @click.option("--token", required=True, envvar="SLACK_TOKEN") @@ -38,11 +35,8 @@ def tech_support(ctx, date, tech_support_channel_id, backfill): messages = iter_messages(app, tech_support_channel_id, date=day) - for date, messages in itertools.groupby( - messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date() - ): - writer( - "slack_tech_support_requests", - date, - len(list(messages)), - ) + with TimescaleDBWriter("slack_tech_support", "requests") as writer: + for date, messages in itertools.groupby( + messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date() + ): + writer.write(date, len(list(messages))) diff --git a/metrics/timescaledb.py b/metrics/timescaledb.py deleted file mode 100644 index f1b50f70..00000000 --- a/metrics/timescaledb.py +++ /dev/null @@ -1,39 +0,0 @@ -from datetime import datetime, time - -import psycopg -import structlog - - -log = structlog.get_logger() - -CONNECTION = "postgres://postgres:password@localhost:5433/tsdb" - - -def run(sql, *args): - with psycopg.connect(CONNECTION) as conn: - cursor = conn.cursor() - - return cursor.execute(sql, *args) - - -def write(key, date, value, tags=None): - # convert date to a timestamp - # TODO: do we need to do any checking to make sure this is tz-aware and in - # UTC? - dt = datetime.combine(date, time()) - - name = key.removeprefix("github_pull_requests_") - - sql = """ - INSERT INTO github_pull_requests (time, name, value, author, organisation, repo) - VALUES (%s, %s, %s, %s, %s, %s); - """ - - run(sql, (dt, name, value, *tags.values())) - - log.debug( - name, - date=dt.isoformat(), - value=value, - **tags, - ) diff --git a/metrics/timescaledb/__init__.py b/metrics/timescaledb/__init__.py new file mode 100644 index 00000000..42d2ae1d --- /dev/null +++ b/metrics/timescaledb/__init__.py @@ -0,0 +1,6 @@ +from .writer import TimescaleDBWriter + + +__all__ = [ + "TimescaleDBWriter", +] diff --git a/metrics/timescaledb/tables.py b/metrics/timescaledb/tables.py new file mode 100644 index 00000000..963ee643 --- /dev/null +++ b/metrics/timescaledb/tables.py @@ -0,0 +1,19 @@ +github_pull_requests = """ +CREATE TABLE IF NOT EXISTS github_pull_requests ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + name TEXT NOT NULL, + value INTEGER NOT NULL, + author TEXT NOT NULL, + organisation TEXT NOT NULL, + repo TEXT NOT NULL, + CONSTRAINT github_pull_requests_must_be_different UNIQUE (time, name, author, repo) +); +""" +slack_tech_support = """ +CREATE TABLE IF NOT EXISTS slack_tech_support ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + name TEXT NOT NULL, + value INTEGER NOT NULL, + CONSTRAINT slack_tech_support_must_be_different UNIQUE (time, name) +); +""" diff --git a/metrics/timescaledb/writer.py b/metrics/timescaledb/writer.py new file mode 100644 index 00000000..cf2233e4 --- /dev/null +++ b/metrics/timescaledb/writer.py @@ -0,0 +1,76 @@ +import os +from datetime import datetime, time + +import psycopg +import structlog + +from . import tables + + +log = structlog.get_logger() + +DATABASE_URL = os.environ["DATABASE_URL"] + + +def ensure_table(name): + """ + Ensure both the table and hypertable config exist in the database + """ + run(getattr(tables, name)) + + run( + "SELECT create_hypertable(%s, 'time', if_not_exists => TRUE);", + [name], + ) + + +def run(sql, *args): + with psycopg.connect(DATABASE_URL) as conn: + cursor = conn.cursor() + + return cursor.execute(sql, *args) + + +class TimescaleDBWriter: + def __init__(self, table, key): + self.key = key + self.table = table + + def __enter__(self): + ensure_table(self.table) + + return self + + def __exit__(self, *args): + pass + + def write(self, date, value, **kwargs): + # convert date to a timestamp + # TODO: do we need to do any checking to make sure this is tz-aware and in + # UTC? + dt = datetime.combine(date, time()) + + # insert into the table set at instantiation + # unique by the tables `{name}_must_be_different` and we always want to + # bump the value if that triggers a conflict + # the columns could differ per table… do we want an object to represent tables? + if kwargs: + extra_fields = ", " + ", ".join(kwargs.keys()) + placeholders = ", " + ", ".join(["%s" for k in kwargs.keys()]) + else: + extra_fields = "" + placeholders = "" + sql = f""" + INSERT INTO {self.table} (time, name, value {extra_fields}) + VALUES (%s, %s, %s {placeholders}) + ON CONFLICT ON CONSTRAINT {self.table}_must_be_different DO UPDATE SET value = EXCLUDED.value; + """ + + run(sql, (dt, self.key, value, *kwargs.values())) + + log.debug( + self.key, + date=dt.isoformat(), + value=value, + **kwargs, + ) diff --git a/pyproject.toml b/pyproject.toml index 30674dc2..baeacc7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,6 @@ requires-python = ">=3.11" dependencies = [ "click", "github-to-sqlite", - "influxdb-client", "requests", "psycopg[binary]", "slack-bolt", diff --git a/remove-all-measurements.py b/remove-all-measurements.py deleted file mode 100755 index 243b340c..00000000 --- a/remove-all-measurements.py +++ /dev/null @@ -1,17 +0,0 @@ -from metrics import influxdb -from metrics.logs import setup_logging - - -setup_logging() - -measurements = [ - "github_pull_requests_queue", - "github_pull_requests_queue_older_than_2_days", - "github_pull_requests_queue_older_than_10_days", - "github_pull_requests_queue_older_than_30_days", - "github_pull_requests_queue_older_than_60_days", - "github_pull_requests_throughput", -] - -for measurement in measurements: - influxdb.delete(measurement) diff --git a/requirements.prod.txt b/requirements.prod.txt index def51d7f..38d6396e 100644 --- a/requirements.prod.txt +++ b/requirements.prod.txt @@ -7,9 +7,7 @@ certifi==2023.7.22 \ --hash=sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082 \ --hash=sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9 - # via - # influxdb-client - # requests + # via requests charset-normalizer==3.3.2 \ --hash=sha256:06435b539f889b1f6f4ac1758871aae42dc3a8c0e24ac9e60c2384973ad73027 \ --hash=sha256:06a81e93cd441c56a9b65d8e1d043daeb97a3d0856d177d5c90ba85acb3db087 \ @@ -121,10 +119,6 @@ idna==3.4 \ --hash=sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4 \ --hash=sha256:90b77e79eaa3eba6de819a0c442c0b4ceefc341a7a2ab77d7562bf49f425c5c2 # via requests -influxdb-client==1.38.0 \ - --hash=sha256:7d04c06b833800be3350c6cb8f19f01f3f4ab33a77a24969568a141e4e132358 \ - --hash=sha256:88ee8c1beb6b3b1359f4117d51704d5da5ac70e598b9fe786823e36ac86175a8 - # via metrics (pyproject.toml) pluggy==1.3.0 \ --hash=sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12 \ --hash=sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7 @@ -203,9 +197,7 @@ psycopg-binary==3.1.12 \ python-dateutil==2.8.2 \ --hash=sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86 \ --hash=sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9 - # via - # influxdb-client - # sqlite-utils + # via sqlite-utils pyyaml==6.0.1 \ --hash=sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5 \ --hash=sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc \ @@ -258,10 +250,6 @@ pyyaml==6.0.1 \ --hash=sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d \ --hash=sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f # via github-to-sqlite -reactivex==4.0.4 \ - --hash=sha256:0004796c420bd9e68aad8e65627d85a8e13f293de76656165dffbcb3a0e3fb6a \ - --hash=sha256:e912e6591022ab9176df8348a653fe8c8fa7a301f26f9931c9d8c78a650e04e8 - # via influxdb-client requests==2.31.0 \ --hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \ --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1 @@ -299,18 +287,8 @@ tabulate==0.9.0 \ typing-extensions==4.8.0 \ --hash=sha256:8f92fc8806f9a6b641eaa5318da32b44d401efaac0f6678c9bc448ba3605faa0 \ --hash=sha256:df8e4339e9cb77357558cbdbceca33c303714cf861d1eef15e1070055ae8b7ef - # via - # psycopg - # reactivex + # via psycopg urllib3==2.0.7 \ --hash=sha256:c97dfde1f7bd43a71c8d2a58e369e9b2bf692d1334ea9f9cae55add7d0dd0f84 \ --hash=sha256:fdb6d215c776278489906c2f8916e6e7d4f5a9b602ccbcfdf7f016fc8da0596e - # via - # influxdb-client - # requests - -# The following packages are considered to be unsafe in a requirements file: -setuptools==68.2.2 \ - --hash=sha256:4ac1475276d2f1c48684874089fefcd83bd7162ddaafb81fac866ba0db282a87 \ - --hash=sha256:b454a35605876da60632df1a60f736524eb73cc47bbc9f3f1ef1b644de74fd2a - # via influxdb-client + # via requests diff --git a/setup-timescaledb-tables.py b/setup-timescaledb-tables.py deleted file mode 100644 index 88560701..00000000 --- a/setup-timescaledb-tables.py +++ /dev/null @@ -1,33 +0,0 @@ -from metrics.timescaledb import run - - -def create_table(name): - run( - f""" - CREATE TABLE IF NOT EXISTS {name} ( - time TIMESTAMP WITH TIME ZONE NOT NULL, - name TEXT NOT NULL, - value INTEGER NOT NULL, - author TEXT NOT NULL, - organisation TEXT NOT NULL, - repo TEXT NOT NULL - ); - """ - ) - run("SELECT create_hypertable(%s, 'time');", (name,)) - run(f"CREATE INDEX IF NOT EXISTS idx_{name}_time ON {name} (name, time DESC);") - - -names = [ - "github_pull_requests_queue", - "github_pull_requests_queue_older_than_2_days", - "github_pull_requests_queue_older_than_10_days", - "github_pull_requests_queue_older_than_30_days", - "github_pull_requests_queue_older_than_60_days", - "github_pull_requests_throughput", -] - -# for name in names: -# create_table(name) - -create_table("github_pull_requests")