Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… 2317/copy_opp_data_to_analytics
  • Loading branch information
babebe committed Dec 16, 2024
2 parents a56db5b + 9ee9e4d commit 36aceba
Show file tree
Hide file tree
Showing 39 changed files with 446 additions and 303 deletions.
20 changes: 10 additions & 10 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# We are limiting this to a few engineers and choosing not to select "Require review from Code Owners."
* @acouch @chouinar @mdragon
# Alphabetize the folder names, alphabetize codeowner names.
# Keep this document in sync with MAINTAINERS.md.

/documentation/ @widal001 @andycochran @mdragon
/frontend/ @andycochran @acouch @doug-s-nava
/api/ @chouinar @mdragon
/analytics/ @coilysiren @widal001 @acouch

/infra/ @coilysiren @acouch @mdragon
/bin/ @coilysiren @acouch
/.github/ @coilysiren @acouch
* @mdragon # project lead
/.github/ @acouch @coilysiren @mdragon # same as infra
/analytics/ @acouch @coilysiren @widal001
/api/ @chouinar @mdragon
/bin/ @coilysiren @acouch @mdragon # same as infra
/documentation/ @acouch @andycochran @btabaska @chouinar @coilysiren @doug-s-nava @mdragon @widal001 # everyone
/frontend/ @acouch @andycochran @btabaska @doug-s-nava
/infra/ @acouch @coilysiren @mdragon
12 changes: 10 additions & 2 deletions api/src/api/users/user_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from src.auth.login_gov_jwt_auth import get_final_redirect_uri, get_login_gov_redirect_uri
from src.db.models.user_models import UserTokenSession
from src.services.users.get_user import get_user
from src.services.users.login_gov_callback_handler import handle_login_gov_callback
from src.services.users.login_gov_callback_handler import (
handle_login_gov_callback_request,
handle_login_gov_token,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,8 +56,13 @@ def user_login(db_session: db.Session) -> flask.Response:
def user_login_callback(db_session: db.Session, query_data: dict) -> flask.Response:
logger.info("GET /v1/users/login/callback")

# We process this in two separate DB transactions
# as we delete state at the end of the first handler
# even if it were to later error to avoid replay attacks
with db_session.begin():
data = handle_login_gov_callback_request(query_data, db_session)
with db_session.begin():
result = handle_login_gov_callback(query_data, db_session)
result = handle_login_gov_token(db_session, data)

# Redirect to the final location for the user
return response.redirect_response(
Expand Down
1 change: 1 addition & 0 deletions api/src/auth/api_jwt_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def create_jwt_for_user(
"iat": current_time,
"aud": config.audience,
"iss": config.issuer,
"user_id": str(user.user_id),
}

logger.info(
Expand Down
12 changes: 8 additions & 4 deletions api/src/auth/login_gov_jwt_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def get_final_redirect_uri(
return f"{config.login_final_destination}?{encoded_params}"


def validate_token(token: str, config: LoginGovConfig | None = None) -> LoginGovUser:
def validate_token(token: str, nonce: str, config: LoginGovConfig | None = None) -> LoginGovUser:
if not config:
config = get_config()

Expand All @@ -205,22 +205,22 @@ def validate_token(token: str, config: LoginGovConfig | None = None) -> LoginGov
# Iterate over the public keys we have and check each
# to determine if we have a valid key.
for public_key in config.public_keys:
user = _validate_token_with_key(token, public_key, config)
user = _validate_token_with_key(token, nonce, public_key, config)
if user is not None:
return user

_refresh_keys(config)

for public_key in config.public_keys:
user = _validate_token_with_key(token, public_key, config)
user = _validate_token_with_key(token, nonce, public_key, config)
if user is not None:
return user

raise JwtValidationError("Token could not be validated against any public keys from login.gov")


def _validate_token_with_key(
token: str, public_key: jwt.PyJWK | str, config: LoginGovConfig
token: str, nonce: str, public_key: jwt.PyJWK | str, config: LoginGovConfig
) -> LoginGovUser | None:
# We are processing the id_token as described on:
# https://developers.login.gov/oidc/token/#token-response
Expand All @@ -244,6 +244,10 @@ def _validate_token_with_key(
)
payload = data.get("payload", {})

payload_nonce = payload.get("nonce", None)
if payload_nonce != nonce:
raise JwtValidationError("Nonce does not match expected")

user_id = payload["sub"]
email = payload["email"]

Expand Down
88 changes: 69 additions & 19 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import src.adapters.db as db
import src.adapters.search as search
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.agency_models import Agency
from src.db.models.opportunity_models import (
CurrentOpportunitySummary,
Opportunity,
Expand Down Expand Up @@ -37,6 +38,7 @@ class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"
TEST_RECORDS_SKIPPED = "test_records_skipped"

def __init__(
self,
Expand Down Expand Up @@ -72,6 +74,15 @@ def run_task(self) -> None:
def incremental_updates_and_deletes(self) -> None:
existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index()

# Handle updates/inserts
self._handle_incremental_upserts(existing_opportunity_ids)

# Handle deletes
self._handle_incremental_delete(existing_opportunity_ids)

def _handle_incremental_upserts(self, existing_opportunity_ids: set[int]) -> None:
"""Handle updates/inserts of opportunities into the search index when running incrementally"""

# Fetch opportunities that need processing from the queue
queued_opportunities = (
self.db_session.execute(
Expand Down Expand Up @@ -114,9 +125,42 @@ def incremental_updates_and_deletes(self) -> None:
loaded_ids = self.load_records(opportunities_to_index)
logger.info(f"Indexed {len(loaded_ids)} opportunities")

# Handle deletes - opportunities in search but not in our processed set
# and not in our database (or are drafts)
opportunity_ids_to_delete = existing_opportunity_ids - processed_opportunity_ids
# Clear processed / skipped entries from the queue
self.db_session.execute(
delete(OpportunitySearchIndexQueue).where(
OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids)
)
)

def _handle_incremental_delete(self, existing_opportunity_ids: set[int]) -> None:
"""Handle deletion of opportunities when running incrementally
Scenarios in which we delete an opportunity from the index:
* An opportunity is no longer in our database
* An opportunity is a draft (unlikely to ever happen, would require published->draft)
* An opportunity loses its opportunity status
* An opportunity has a test agency
"""

# Fetch the opportunity IDs of opportunities we would expect to be in the index
opportunity_ids_we_want_in_search: set[int] = set(
self.db_session.execute(
select(Opportunity.opportunity_id)
.join(CurrentOpportunitySummary)
.join(Agency, Opportunity.agency_code == Agency.agency_code, isouter=True)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
# We treat a null agency as fine
# We only want to filter out if is_test_agency=True specifically
Agency.is_test_agency.isnot(True),
)
)
.scalars()
.all()
)

opportunity_ids_to_delete = existing_opportunity_ids - opportunity_ids_we_want_in_search

for opportunity_id in opportunity_ids_to_delete:
logger.info(
Expand All @@ -127,15 +171,6 @@ def incremental_updates_and_deletes(self) -> None:
if opportunity_ids_to_delete:
self.search_client.bulk_delete(self.index_name, opportunity_ids_to_delete)

# Clear processed entries from the queue
if processed_opportunity_ids:
self.db_session.execute(
delete(OpportunitySearchIndexQueue).where(
OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids)
)
)
self.db_session.commit()

def full_refresh(self) -> None:
# create the index
self.search_client.create_index(
Expand Down Expand Up @@ -171,6 +206,13 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
# Top level agency won't be automatically fetched up front unless we add this
# due to the odd nature of the relationship we have setup for the agency table
# Adding it here improves performance when serializing to JSON as we won't need to
# call out to the DB repeatedly.
.options(
selectinload(Opportunity.agency_record).selectinload(Agency.top_level_agency)
)
.execution_options(yield_per=1000)
)
.scalars()
Expand Down Expand Up @@ -204,13 +246,21 @@ def load_records(self, records: Sequence[Opportunity]) -> set[int]:
loaded_opportunity_ids = set()

for record in records:
logger.info(
"Preparing opportunity for upload to search index",
extra={
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
},
)
log_extra = {
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
}
logger.info("Preparing opportunity for upload to search index", extra=log_extra)

# If the opportunity has a test agency, skip uploading it to the index
if record.agency_record and record.agency_record.is_test_agency:
logger.info(
"Skipping upload of opportunity as agency is a test agency",
extra=log_extra | {"agency": record.agency_code},
)
self.increment(self.Metrics.TEST_RECORDS_SKIPPED)
continue

json_records.append(schema.dump(record))
self.increment(self.Metrics.RECORDS_LOADED)

Expand Down
40 changes: 32 additions & 8 deletions api/src/services/users/login_gov_callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class CallbackParams(BaseModel):
error_description: str | None = None


@dataclass
class LoginGovDataContainer:
"""Holds various login gov related fields we want to pass around"""

code: str
nonce: str


@dataclass
class LoginGovCallbackResponse:
token: str
Expand All @@ -37,13 +45,14 @@ def get_login_gov_client() -> LoginGovOauthClient:
return LoginGovOauthClient()


def handle_login_gov_callback(query_data: dict, db_session: db.Session) -> LoginGovCallbackResponse:
def handle_login_gov_callback_request(
query_data: dict, db_session: db.Session
) -> LoginGovDataContainer:
"""Handle the callback from login.gov after calling the authenticate endpoint
NOTE: Any errors thrown here will actually lead to a redirect due to the
with_login_redirect_error_handler handler we have attached to the route
"""

# Process the data coming back from login.gov via the redirect query params
# see: https://developers.login.gov/oidc/authorization/#authorization-response
callback_params = CallbackParams.model_validate(query_data)
Expand Down Expand Up @@ -75,29 +84,44 @@ def handle_login_gov_callback(query_data: dict, db_session: db.Session) -> Login
if login_gov_state is None:
raise_flask_error(404, "OAuth state not found")

# We do not want the login_gov_state to be reusable - so delete it
# even if we later error to avoid any replay attacks.
db_session.delete(login_gov_state)

return LoginGovDataContainer(code=callback_params.code, nonce=str(login_gov_state.nonce))


def handle_login_gov_token(
db_session: db.Session, login_gov_data: LoginGovDataContainer
) -> LoginGovCallbackResponse:
"""Fetch user info from login gov, and handle user creation
NOTE: Any errors thrown here will actually lead to a redirect due to the
with_login_redirect_error_handler handler we have attached to the route
"""

# call the token endpoint (make a client)
# https://developers.login.gov/oidc/token/
client = get_login_gov_client()
response = client.get_token(
OauthTokenRequest(
code=callback_params.code, client_assertion=get_login_gov_client_assertion()
code=login_gov_data.code, client_assertion=get_login_gov_client_assertion()
)
)

# If this request failed, we'll assume we're the issue and 500
# TODO - need to test with actual login.gov if there could be other scenarios
# the mock always returns something as long as the request is well-formatted
if response.is_error_response():
raise_flask_error(500, response.error_description)

# Process the token response from login.gov
return _process_token(db_session, response.id_token)
# which will create/update a user in the DB
return _process_token(db_session, response.id_token, login_gov_data.nonce)


def _process_token(db_session: db.Session, token: str) -> LoginGovCallbackResponse:
def _process_token(db_session: db.Session, token: str, nonce: str) -> LoginGovCallbackResponse:
"""Process the token from login.gov and generate our own token for auth"""
try:
login_gov_user = validate_token(token)
login_gov_user = validate_token(token, nonce)
except JwtValidationError as e:
logger.info("Login.gov token validation failed", extra={"auth.issue": e.message})
raise_flask_error(401, e.message)
Expand Down
Loading

0 comments on commit 36aceba

Please sign in to comment.