Skip to content

Commit

Permalink
Merge branch 'main' into kai/cdn-part-2
Browse files Browse the repository at this point in the history
  • Loading branch information
coilysiren authored Dec 16, 2024
2 parents bb16df6 + 9ee9e4d commit 9777193
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 164 deletions.
20 changes: 10 additions & 10 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# We are limiting this to a few engineers and choosing not to select "Require review from Code Owners."
* @acouch @chouinar @mdragon
# Alphabetize the folder names, alphabetize codeowner names.
# Keep this document in sync with MAINTAINERS.md.

/documentation/ @widal001 @andycochran @mdragon
/frontend/ @andycochran @acouch @doug-s-nava
/api/ @chouinar @mdragon
/analytics/ @coilysiren @widal001 @acouch

/infra/ @coilysiren @acouch @mdragon
/bin/ @coilysiren @acouch
/.github/ @coilysiren @acouch
* @mdragon # project lead
/.github/ @acouch @coilysiren @mdragon # same as infra
/analytics/ @acouch @coilysiren @widal001
/api/ @chouinar @mdragon
/bin/ @coilysiren @acouch @mdragon # same as infra
/documentation/ @acouch @andycochran @btabaska @chouinar @coilysiren @doug-s-nava @mdragon @widal001 # everyone
/frontend/ @acouch @andycochran @btabaska @doug-s-nava
/infra/ @acouch @coilysiren @mdragon
88 changes: 69 additions & 19 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import src.adapters.db as db
import src.adapters.search as search
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.agency_models import Agency
from src.db.models.opportunity_models import (
CurrentOpportunitySummary,
Opportunity,
Expand Down Expand Up @@ -37,6 +38,7 @@ class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"
TEST_RECORDS_SKIPPED = "test_records_skipped"

def __init__(
self,
Expand Down Expand Up @@ -72,6 +74,15 @@ def run_task(self) -> None:
def incremental_updates_and_deletes(self) -> None:
existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index()

# Handle updates/inserts
self._handle_incremental_upserts(existing_opportunity_ids)

# Handle deletes
self._handle_incremental_delete(existing_opportunity_ids)

def _handle_incremental_upserts(self, existing_opportunity_ids: set[int]) -> None:
"""Handle updates/inserts of opportunities into the search index when running incrementally"""

# Fetch opportunities that need processing from the queue
queued_opportunities = (
self.db_session.execute(
Expand Down Expand Up @@ -114,9 +125,42 @@ def incremental_updates_and_deletes(self) -> None:
loaded_ids = self.load_records(opportunities_to_index)
logger.info(f"Indexed {len(loaded_ids)} opportunities")

# Handle deletes - opportunities in search but not in our processed set
# and not in our database (or are drafts)
opportunity_ids_to_delete = existing_opportunity_ids - processed_opportunity_ids
# Clear processed / skipped entries from the queue
self.db_session.execute(
delete(OpportunitySearchIndexQueue).where(
OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids)
)
)

def _handle_incremental_delete(self, existing_opportunity_ids: set[int]) -> None:
"""Handle deletion of opportunities when running incrementally
Scenarios in which we delete an opportunity from the index:
* An opportunity is no longer in our database
* An opportunity is a draft (unlikely to ever happen, would require published->draft)
* An opportunity loses its opportunity status
* An opportunity has a test agency
"""

# Fetch the opportunity IDs of opportunities we would expect to be in the index
opportunity_ids_we_want_in_search: set[int] = set(
self.db_session.execute(
select(Opportunity.opportunity_id)
.join(CurrentOpportunitySummary)
.join(Agency, Opportunity.agency_code == Agency.agency_code, isouter=True)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
# We treat a null agency as fine
# We only want to filter out if is_test_agency=True specifically
Agency.is_test_agency.isnot(True),
)
)
.scalars()
.all()
)

opportunity_ids_to_delete = existing_opportunity_ids - opportunity_ids_we_want_in_search

for opportunity_id in opportunity_ids_to_delete:
logger.info(
Expand All @@ -127,15 +171,6 @@ def incremental_updates_and_deletes(self) -> None:
if opportunity_ids_to_delete:
self.search_client.bulk_delete(self.index_name, opportunity_ids_to_delete)

# Clear processed entries from the queue
if processed_opportunity_ids:
self.db_session.execute(
delete(OpportunitySearchIndexQueue).where(
OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids)
)
)
self.db_session.commit()

def full_refresh(self) -> None:
# create the index
self.search_client.create_index(
Expand Down Expand Up @@ -171,6 +206,13 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
# Top level agency won't be automatically fetched up front unless we add this
# due to the odd nature of the relationship we have setup for the agency table
# Adding it here improves performance when serializing to JSON as we won't need to
# call out to the DB repeatedly.
.options(
selectinload(Opportunity.agency_record).selectinload(Agency.top_level_agency)
)
.execution_options(yield_per=1000)
)
.scalars()
Expand Down Expand Up @@ -204,13 +246,21 @@ def load_records(self, records: Sequence[Opportunity]) -> set[int]:
loaded_opportunity_ids = set()

for record in records:
logger.info(
"Preparing opportunity for upload to search index",
extra={
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
},
)
log_extra = {
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
}
logger.info("Preparing opportunity for upload to search index", extra=log_extra)

# If the opportunity has a test agency, skip uploading it to the index
if record.agency_record and record.agency_record.is_test_agency:
logger.info(
"Skipping upload of opportunity as agency is a test agency",
extra=log_extra | {"agency": record.agency_code},
)
self.increment(self.Metrics.TEST_RECORDS_SKIPPED)
continue

json_records.append(schema.dump(record))
self.increment(self.Metrics.RECORDS_LOADED)

Expand Down
60 changes: 55 additions & 5 deletions api/tests/src/search/backend/test_load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import itertools

import pytest

from src.db.models.opportunity_models import OpportunitySearchIndexQueue
Expand All @@ -7,7 +9,11 @@
)
from src.util.datetime_util import get_now_us_eastern_datetime
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory, OpportunitySearchIndexQueueFactory
from tests.src.db.models.factories import (
AgencyFactory,
OpportunityFactory,
OpportunitySearchIndexQueueFactory,
)


class TestLoadOpportunitiesToIndexFullRefresh(BaseTestClass):
Expand All @@ -26,22 +32,34 @@ def test_load_opportunities_to_index(
opportunity_index_alias,
load_opportunities_to_index,
):
# Create an agency that some records will be connected to
agency = AgencyFactory.create(agency_code="FUN-AGENCY", is_test_agency=False)

# Create 25 opportunities we will load into the search index
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_posted_summary=True, agency_code=agency.agency_code
)
)
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
OpportunityFactory.create_batch(
size=6, is_archived_forecast_summary=True, agency_code=agency.agency_code
)
)

# Create some opportunities that won't get fetched / loaded into search
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)

AgencyFactory.create(agency_code="MY-TEST-AGENCY", is_test_agency=True)
OpportunityFactory.create_batch(size=3, agency_code="MY-TEST-AGENCY")

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
Expand Down Expand Up @@ -76,6 +94,16 @@ def test_load_opportunities_to_index(
[record["opportunity_id"] for record in resp.records]
)

assert load_opportunities_to_index.metrics[
load_opportunities_to_index.Metrics.RECORDS_LOADED
] == len(opportunities)
assert (
load_opportunities_to_index.metrics[
load_opportunities_to_index.Metrics.TEST_RECORDS_SKIPPED
]
== 3
)

# Rerunning but first add a few more opportunities to show up
opportunities.extend(OpportunityFactory.create_batch(size=3))
load_opportunities_to_index.index_name = (
Expand Down Expand Up @@ -129,7 +157,10 @@ def test_load_opportunities_to_index(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
)

for opportunity in opportunities:
AgencyFactory.create(agency_code="MY-TEST-AGENCY-123", is_test_agency=True)
test_opps = OpportunityFactory.create_batch(size=2, agency_code="MY-TEST-AGENCY-123")

for opportunity in itertools.chain(opportunities, test_opps):
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
)
Expand All @@ -139,6 +170,13 @@ def test_load_opportunities_to_index(
resp = search_client.search(opportunity_index_alias, {"size": 100})
assert resp.total_records == len(opportunities)

assert load_opportunities_to_index.metrics[
load_opportunities_to_index.Metrics.RECORDS_LOADED
] == len(opportunities)
assert load_opportunities_to_index.metrics[
load_opportunities_to_index.Metrics.TEST_RECORDS_SKIPPED
] == len(test_opps)

# Add a few more opportunities that will be created
opportunities.extend(OpportunityFactory.create_batch(size=3, is_posted_summary=True))

Expand All @@ -147,15 +185,27 @@ def test_load_opportunities_to_index(
for opportunity in opportunities_to_delete:
db_session.delete(opportunity)

# Change the agency on a few to a test agency to delete them
opportunities_now_with_test_agency = opportunities[0:3]
for opportunity in opportunities_now_with_test_agency:
opportunity.agency_code = "MY-TEST-AGENCY-123"

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
)

db_session.commit()
db_session.expunge_all()
load_opportunities_to_index.run()

resp = search_client.search(opportunity_index_alias, {"size": 100})
assert resp.total_records == len(opportunities)
assert resp.total_records == len(opportunities) - 3 # test agency opportunities excluded

# Running one last time without any changes should be fine as well
load_opportunities_to_index.run()
resp = search_client.search(opportunity_index_alias, {"size": 100})
assert resp.total_records == len(opportunities) - 3

def test_load_opportunities_to_index_index_does_not_exist(self, db_session, search_client):
config = LoadOpportunitiesToIndexConfig(
Expand Down
Loading

0 comments on commit 9777193

Please sign in to comment.