Skip to content

Commit

Permalink
Merge pull request #21 from ebmdatalab/sqlalchemy
Browse files Browse the repository at this point in the history
Switch to SQLAlchemy from raw strings
  • Loading branch information
ghickman authored Nov 14, 2023
2 parents f0b076b + 8e47869 commit 5e7278c
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 87 deletions.
2 changes: 1 addition & 1 deletion dotenv-sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# The DSN for access the timescaledb database
TIMESCALEDB_URL=postgres://user:pass@localhost:5433/metrics
TIMESCALEDB_URL=postgresql://user:pass@localhost:5433/metrics

# API token for pulling data from Github
GITHUB_TOKEN=
Expand Down
21 changes: 11 additions & 10 deletions metrics/github/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import click
import structlog

from metrics.github.prs import process_prs
from metrics.logs import setup_logging
from metrics.timescaledb import TimescaleDBWriter
from metrics.tools.dates import date_from_iso, datetime_from_iso, iter_days
from ..logs import setup_logging
from ..timescaledb import TimescaleDBWriter
from ..timescaledb.tables import GitHubPullRequests
from ..tools.dates import date_from_iso, datetime_from_iso, iter_days
from .prs import process_prs


setup_logging()
Expand Down Expand Up @@ -79,8 +80,8 @@ def pr_queue(prs, org, start, days_threshold=None):
key = f"queue{suffix}"

log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_on_day))
with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer:
process_prs(writer, prs_on_day, day)
with TimescaleDBWriter(GitHubPullRequests) as writer:
process_prs(writer, prs_on_day, day, f"queue{suffix}")


def pr_throughput(prs, org, start):
Expand Down Expand Up @@ -116,16 +117,16 @@ def next_weekday(d, weekday):

key = "throughput"
log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_in_range))
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs_in_range, day)
with TimescaleDBWriter(GitHubPullRequests) as writer:
process_prs(writer, prs_in_range, day, name="throughput")


@click.command()
@click.argument("org")
@click.option("--pull-data", is_flag=True, default=False)
@click.option("--db-path", type=str)
@click.option("--db-path", type=str, default="github.db")
@click.pass_context
def backfill(ctx, org, pull_data, db_path="github.db"):
def backfill(ctx, org, pull_data, db_path):
"""Backfill GitHub data for the given GitHub ORG"""
if pull_data:
# clean up existing db
Expand Down
7 changes: 4 additions & 3 deletions metrics/github/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import structlog

from ..timescaledb import TimescaleDBWriter
from ..timescaledb.tables import GitHubPullRequests
from . import api
from .backfill import backfill
from .prs import process_prs
Expand Down Expand Up @@ -54,13 +55,13 @@ def pr_queue(ctx, org, date, days_threshold):
def pr_throughput(ctx, org, date, days):
"""PRs opened in the last number of days given"""
end = date.date()
start = date - timedelta(days=days)
start = end - timedelta(days=days)

prs = api.prs_opened_in_the_last_N_days(org, start, end)

log.info("%s | %s | Processing %s PRs", date, org, len(prs))
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs, date)
with TimescaleDBWriter(GitHubPullRequests) as writer:
process_prs(writer, prs, date, name="throughput")


github.add_command(backfill)
3 changes: 2 additions & 1 deletion metrics/github/prs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def process_prs(writer, prs, date):
def process_prs(writer, prs, date, name=""):
"""
Given a list of PRs, break them down in series for writing
Expand All @@ -23,6 +23,7 @@ def process_prs(writer, prs, date):
writer.write(
date,
len(prs_by_author_and_repo),
name=name,
author=author,
organisation=org,
repo=repo,
Expand Down
5 changes: 5 additions & 0 deletions metrics/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def setup_logging(debug=False):
"level": "DEBUG" if debug else "INFO",
"propagate": True,
},
"sqlalchemy": {
"handlers": ["console"],
"level": "WARNING",
"propagate": False,
},
},
}
)
Expand Down
5 changes: 3 additions & 2 deletions metrics/slack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import click

from ..timescaledb import TimescaleDBWriter
from ..timescaledb.tables import SlackTechSupport
from .api import get_app, iter_messages


Expand Down Expand Up @@ -35,8 +36,8 @@ def tech_support(ctx, date, tech_support_channel_id, backfill):

messages = iter_messages(app, tech_support_channel_id, date=day)

with TimescaleDBWriter("slack_tech_support", "requests") as writer:
with TimescaleDBWriter(SlackTechSupport) as writer:
for date, messages in itertools.groupby(
messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date()
):
writer.write(date, len(list(messages)))
writer.write(date, len(list(messages)), name="requests")
42 changes: 23 additions & 19 deletions metrics/timescaledb/tables.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
github_pull_requests = """
CREATE TABLE IF NOT EXISTS github_pull_requests (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
author TEXT NOT NULL,
organisation TEXT NOT NULL,
repo TEXT NOT NULL,
CONSTRAINT github_pull_requests_must_be_different UNIQUE (time, name, author, repo)
);
"""
slack_tech_support = """
CREATE TABLE IF NOT EXISTS slack_tech_support (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
CONSTRAINT slack_tech_support_must_be_different UNIQUE (time, name)
);
"""
from sqlalchemy import TIMESTAMP, Column, Integer, MetaData, Table, Text


metadata = MetaData()

GitHubPullRequests = Table(
"github_pull_requests",
metadata,
Column("time", TIMESTAMP(timezone=True), primary_key=True),
Column("name", Text, primary_key=True),
Column("value", Integer),
Column("author", Text, primary_key=True),
Column("organisation", Text),
Column("repo", Text, primary_key=True),
)

SlackTechSupport = Table(
"slack_tech_support",
metadata,
Column("time", TIMESTAMP(timezone=True), primary_key=True),
Column("name", Text, primary_key=True),
Column("value", Integer),
)
90 changes: 43 additions & 47 deletions metrics/timescaledb/writer.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,75 @@
import os
from datetime import datetime, time

import psycopg
import structlog

from . import tables
from sqlalchemy import create_engine, inspect, schema, text
from sqlalchemy.dialects.postgresql import insert


log = structlog.get_logger()

TIMESCALEDB_URL = os.environ["TIMESCALEDB_URL"]
# Note: psycopg2 is still the default postgres dialect for sqlalchemy so we
# inject +psycopg to enable using v3
TIMESCALEDB_URL = os.environ["TIMESCALEDB_URL"].replace(
"postgresql", "postgresql+psycopg"
)


def ensure_table(name):
def ensure_table(engine, table):
"""
Ensure both the table and hypertable config exist in the database
"""
run(getattr(tables, name))

run(
"SELECT create_hypertable(%s, 'time', if_not_exists => TRUE);",
[name],
)

# ensure the RO grafana user can read the table
run(f"GRANT SELECT ON {name} TO grafanareader")


def run(sql, *args):
with psycopg.connect(TIMESCALEDB_URL) as conn:
cursor = conn.cursor()
with engine.begin() as connection:
connection.execute(schema.CreateTable(table, if_not_exists=True))

with engine.begin() as connection:
connection.execute(
text(
f"SELECT create_hypertable('{table.name}', 'time', if_not_exists => TRUE);"
)
)

return cursor.execute(sql, *args)
# ensure the RO grafana user can read the table
connection.execute(text(f"GRANT SELECT ON {table.name} TO grafanareader"))


class TimescaleDBWriter:
def __init__(self, table, key):
self.key = key
inserts = []

def __init__(self, table):
self.table = table
self.engine = create_engine(TIMESCALEDB_URL)

def __enter__(self):
ensure_table(self.table)
ensure_table(self.engine, self.table)

return self

def __exit__(self, *args):
pass
with self.engine.begin() as connection:
for stmt in self.inserts:
connection.execute(stmt)

def write(self, date, value, **kwargs):
# convert date to a timestamp
# TODO: do we need to do any checking to make sure this is tz-aware and in
# UTC?
dt = datetime.combine(date, time())

# insert into the table set at instantiation
# unique by the tables `{name}_must_be_different` and we always want to
# bump the value if that triggers a conflict
# the columns could differ per table… do we want an object to represent tables?
if kwargs:
extra_fields = ", " + ", ".join(kwargs.keys())
placeholders = ", " + ", ".join(["%s" for k in kwargs.keys()])
else:
extra_fields = ""
placeholders = ""
sql = f"""
INSERT INTO {self.table} (time, name, value {extra_fields})
VALUES (%s, %s, %s {placeholders})
ON CONFLICT ON CONSTRAINT {self.table}_must_be_different DO UPDATE SET value = EXCLUDED.value;
"""

run(sql, (dt, self.key, value, *kwargs.values()))

log.debug(
self.key,
date=dt.isoformat(),
value=value,
**kwargs,
# get the primary key name from the given table
constraint = inspect(self.engine).get_pk_constraint(self.table.name)["name"]

# TODO: could we put do all the rows at once in the values() call and
# then use EXCLUDED to reference the value in the set_?
insert_stmt = (
insert(self.table)
.values(time=dt, value=value, **kwargs)
.on_conflict_do_update(
constraint=constraint,
set_={"value": value},
)
)

self.inserts.append(insert_stmt)

log.debug(insert_stmt)
2 changes: 1 addition & 1 deletion metrics/tools/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def date_from_iso(value):
if value is None:
return date.today()

return date.fromisoformat(value)
return datetime_from_iso(value).date()


def datetime_from_iso(value):
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ requires-python = ">=3.11"
dependencies = [
"click",
"github-to-sqlite",
"greenlet",
"requests",
"psycopg[binary]",
"slack-bolt",
"sqlalchemy[postgresql_psycopgbinary]",
"structlog",
]
dynamic = ["version"]
Expand Down
Loading

0 comments on commit 5e7278c

Please sign in to comment.