Skip to content

Commit

Permalink
[Issue 2665] Analytics jobs conditionally use IAM token as Postgres p…
Browse files Browse the repository at this point in the history
…wd (#2799)

## Summary
Fixes #2665 

### Time to review: __2 mins__

## Changes proposed
> What was added, updated, or removed in this PR.

Adds a switch in `analytics/integrations/db.py` that determines which
value to use for a DB password when connecting to Postgres: either the
value in `local.env` or an IAM token, depending on an environment
variable.

Also adds better exception handling, to make errors easier to spot in
CI.

## Context for reviewers
> Testing instructions, background context, more in-depth details of the
implementation, and anything else you'd like to call out or ask
reviewers. Explain how the changes were verified.

This is a follow up to previous PRs,
#2786 and
#2796, and part of an
effort to get `analytics` step functions to successfully connect to
Postgres DB in a CI environment.

## Additional information
> Screenshots, GIF demos, code examples or output to help show the
changes working as expected.
  • Loading branch information
DavidDudas-Intuitial authored Nov 9, 2024
1 parent ce2902c commit 56acd6d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
1 change: 1 addition & 0 deletions analytics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DBSettings(PydanticBaseEnvConfig):
slack_bot_token: str = Field(alias="ANALYTICS_SLACK_BOT_TOKEN")
reporting_channel_id: str = Field(alias="ANALYTICS_REPORTING_CHANNEL_ID")
aws_region: Optional[str] = Field(None, alias="AWS_REGION")
local_env: bool = True if os.getenv("ENVIRONMENT", "local") == "local" else False

def get_db_settings() -> DBSettings:
return DBSettings()
2 changes: 1 addition & 1 deletion analytics/src/analytics/integrations/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_db() -> Engine:
"""
db = get_db_settings()
# inspired by simpler-grants-gov/blob/main/api/src/adapters/db/clients/postgres_client.py
token = generate_iam_auth_token(db) if db.password is None else db.password
token = db.password if db.local_env is True else generate_iam_auth_token(db)

return create_engine(
f"postgresql+psycopg://{db.user}:{token}@{db.db_host}:{db.port}",
Expand Down
10 changes: 8 additions & 2 deletions analytics/src/analytics/integrations/etldb/etldb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Define EtlDb as an abstraction layer for database connections."""

import contextlib
from enum import Enum

from sqlalchemy import Connection
Expand All @@ -24,7 +25,11 @@ def __del__(self) -> None:
def connection(self) -> Connection:
"""Get a connection object from the db engine."""
if self._connection is None:
self._connection = self._db_engine.connect()
try:
self._connection = self._db_engine.connect()
except RuntimeError as e:
message = f"Failed to connect to database: {e}"
raise RuntimeError(message) from e
return self._connection

def commit(self, connection: Connection) -> None:
Expand All @@ -33,7 +38,8 @@ def commit(self, connection: Connection) -> None:

def disconnect(self) -> None:
"""Dispose of db connection."""
self._db_engine.dispose()
with contextlib.suppress(Exception):
self._db_engine.dispose()


class EtlChangeType(Enum):
Expand Down
66 changes: 46 additions & 20 deletions analytics/src/analytics/integrations/etldb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ def init_db() -> None:
sql = f.read()

# execute sql
db = EtlDb()
cursor = db.connection()
cursor.execute(
text(sql),
)
db.commit(cursor)
try:
db = EtlDb()
cursor = db.connection()
cursor.execute(
text(sql),
)
db.commit(cursor)
except RuntimeError as e:
message = f"Failed to initialize db: {e}"
raise RuntimeError(message) from e


def sync_db(dataset: EtlDataset, effective: str) -> None:
Expand All @@ -48,28 +52,50 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
db = EtlDb(effective)

# sync quad data to db resulting in row id for each quad
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")
try:
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")
except RuntimeError as e:
message = f"Failed to sync quad data: {e}"
raise RuntimeError(message) from e

# sync deliverable data to db resulting in row id for each deliverable
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}")
try:
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)
except RuntimeError as e:
message = f"Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e

# sync sprint data to db resulting in row id for each sprint
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")
try:
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")
except RuntimeError as e:
message = f"Failed to sync sprint data: {e}"
raise RuntimeError(message) from e

# sync epic data to db resulting in row id for each epic
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")
try:
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")
except RuntimeError as e:
message = f"Failed to sync epic data: {e}"
raise RuntimeError(message) from e

# sync issue data to db resulting in row id for each issue
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")
try:
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")
except RuntimeError as e:
message = f"Failed to sync issue data: {e}"
raise RuntimeError(message) from e


def sync_deliverables(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
Expand Down

0 comments on commit 56acd6d

Please sign in to comment.