Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch storage over to timescaledb #9

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import structlog

from metrics import influxdb
from metrics.github.prs import process_prs
from metrics.logs import setup_logging
from metrics.timescaledb import TimescaleDBWriter
from metrics.tools.dates import date_from_iso, datetime_from_iso, iter_days


Expand All @@ -17,9 +17,6 @@
log = structlog.get_logger()


writer = influxdb.write


def get_data(db, orgs):
subprocess.check_call(["github-to-sqlite", "repos", db, *orgs])

Expand Down Expand Up @@ -83,7 +80,8 @@ def pr_queue(prs, org, start, days_threshold=None):
key = f"queue{suffix}"

log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_on_day))
process_prs(writer, key, prs_on_day, day)
with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer:
process_prs(writer, prs_on_day, day)


def pr_throughput(prs, org, start):
Expand Down Expand Up @@ -119,7 +117,8 @@ def next_weekday(d, weekday):

key = "throughput"
log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_in_range))
process_prs(writer, key, prs_in_range, day)
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs_in_range, day)


if __name__ == "__main__":
Expand Down
16 changes: 1 addition & 15 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,12 @@ services:
GF_DATABASE_SSL_MODE: disable
depends_on:
- db
- influxdb
- timescaledb
ports:
- 3000:3000
volumes:
- grafana:/var/lib/grafana

influxdb:
image: influxdb:latest
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: admin
DOCKER_INFLUXDB_INIT_ORG: bennett
DOCKER_INFLUXDB_INIT_BUCKET: data
ports:
- 8086:8086
volumes:
- influxdb:/var/lib/influxdb2

timescaledb:
image: timescale/timescaledb-ha:pg14-latest
environment:
Expand All @@ -52,5 +39,4 @@ services:
volumes:
postgres:
grafana:
influxdb:
timescaledb:
4 changes: 3 additions & 1 deletion metrics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

@click.group()
@click.option("--debug", default=False, is_flag=True)
@click.option("--database-url", required=True, envvar="DATABASE_URL")
@click.pass_context
def cli(ctx, debug):
def cli(ctx, debug, database_url):
ctx.ensure_object(dict)

setup_logging(debug)

ctx.obj["DEBUG"] = debug
ctx.obj["DATABASE_URL"] = database_url


cli.add_command(github)
Expand Down
9 changes: 5 additions & 4 deletions metrics/github/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import click
import structlog

from .. import influxdb
from ..timescaledb import TimescaleDBWriter
from . import api
from .prs import process_prs


log = structlog.get_logger()
writer = influxdb.write


@click.group()
Expand Down Expand Up @@ -42,7 +41,8 @@ def pr_queue(ctx, org, date, days_threshold):
suffix = f"_older_than_{days_threshold}_days" if days_threshold else ""

log.info("%s | %s | Processing %s PRs", date, org, len(prs))
process_prs(writer, f"queue{suffix}", prs, date)
with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer:
process_prs(writer, prs, date)


@github.command()
Expand All @@ -58,4 +58,5 @@ def pr_throughput(ctx, org, date, days):
prs = api.prs_opened_in_the_last_N_days(org, start, end)

log.info("%s | %s | Processing %s PRs", date, org, len(prs))
process_prs(writer, "throughput", prs, date)
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs, date)
13 changes: 5 additions & 8 deletions metrics/github/prs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def process_prs(writer, key, prs, date):
def process_prs(writer, prs, date):
"""
Given a list of PRs, break them down in series for writing

Expand All @@ -20,13 +20,10 @@ def process_prs(writer, key, prs, date):

org = list(orgs)[0]

writer(
f"github_pull_requests_{key}",
writer.write(
date,
len(prs_by_author_and_repo),
tags={
"author": author,
"organisation": org,
"repo": repo,
},
author=author,
organisation=org,
repo=repo,
)
65 changes: 0 additions & 65 deletions metrics/influxdb.py

This file was deleted.

18 changes: 6 additions & 12 deletions metrics/slack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@

import click

from .. import influxdb
from ..timescaledb import TimescaleDBWriter
from .api import get_app, iter_messages


writer = influxdb.write


@click.group()
@click.option("--signing-secret", required=True, envvar="SLACK_SIGNING_SECRET")
@click.option("--token", required=True, envvar="SLACK_TOKEN")
Expand Down Expand Up @@ -38,11 +35,8 @@ def tech_support(ctx, date, tech_support_channel_id, backfill):

messages = iter_messages(app, tech_support_channel_id, date=day)

for date, messages in itertools.groupby(
messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date()
):
writer(
"slack_tech_support_requests",
date,
len(list(messages)),
)
with TimescaleDBWriter("slack_tech_support", "requests") as writer:
for date, messages in itertools.groupby(
messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date()
):
writer.write(date, len(list(messages)))
39 changes: 0 additions & 39 deletions metrics/timescaledb.py

This file was deleted.

6 changes: 6 additions & 0 deletions metrics/timescaledb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .writer import TimescaleDBWriter


__all__ = [
"TimescaleDBWriter",
]
19 changes: 19 additions & 0 deletions metrics/timescaledb/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
github_pull_requests = """
CREATE TABLE IF NOT EXISTS github_pull_requests (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
author TEXT NOT NULL,
organisation TEXT NOT NULL,
repo TEXT NOT NULL,
CONSTRAINT github_pull_requests_must_be_different UNIQUE (time, name, author, repo)
);
"""
slack_tech_support = """
CREATE TABLE IF NOT EXISTS slack_tech_support (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
CONSTRAINT slack_tech_support_must_be_different UNIQUE (time, name)
);
"""
76 changes: 76 additions & 0 deletions metrics/timescaledb/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from datetime import datetime, time

import psycopg
import structlog

from . import tables


log = structlog.get_logger()

DATABASE_URL = os.environ["DATABASE_URL"]


def ensure_table(name):
"""
Ensure both the table and hypertable config exist in the database
"""
run(getattr(tables, name))

run(
"SELECT create_hypertable(%s, 'time', if_not_exists => TRUE);",
[name],
)


def run(sql, *args):
with psycopg.connect(DATABASE_URL) as conn:
cursor = conn.cursor()

return cursor.execute(sql, *args)


class TimescaleDBWriter:
def __init__(self, table, key):
self.key = key
self.table = table

def __enter__(self):
ensure_table(self.table)

return self

def __exit__(self, *args):
pass

def write(self, date, value, **kwargs):
# convert date to a timestamp
# TODO: do we need to do any checking to make sure this is tz-aware and in
# UTC?
dt = datetime.combine(date, time())

# insert into the table set at instantiation
# unique by the tables `{name}_must_be_different` and we always want to
# bump the value if that triggers a conflict
# the columns could differ per table… do we want an object to represent tables?
if kwargs:
extra_fields = ", " + ", ".join(kwargs.keys())
placeholders = ", " + ", ".join(["%s" for k in kwargs.keys()])
else:
extra_fields = ""
placeholders = ""
sql = f"""
INSERT INTO {self.table} (time, name, value {extra_fields})
VALUES (%s, %s, %s {placeholders})
ON CONFLICT ON CONSTRAINT {self.table}_must_be_different DO UPDATE SET value = EXCLUDED.value;
"""

run(sql, (dt, self.key, value, *kwargs.values()))

log.debug(
self.key,
date=dt.isoformat(),
value=value,
**kwargs,
)
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ requires-python = ">=3.11"
dependencies = [
"click",
"github-to-sqlite",
"influxdb-client",
"requests",
"psycopg[binary]",
"slack-bolt",
Expand Down
Loading