Skip to content

Commit

Permalink
Use only the FamilyGeography abandon geography_id on family (#315)
Browse files Browse the repository at this point in the history
* partial changes - working pipeline tests

* bump version

* sort out tests

* remove geography_id from test setups

* Download whole db now uses multi-geos

* Update app/db/crud/geography.py

Co-authored-by: Katy Baulch <[email protected]>

* Update app/db/crud/geography.py

Co-authored-by: Katy Baulch <[email protected]>

---------

Co-authored-by: Katy Baulch <[email protected]>
  • Loading branch information
diversemix and katybaulch authored Aug 28, 2024
1 parent 210c9aa commit 1903d3d
Show file tree
Hide file tree
Showing 14 changed files with 377 additions and 315 deletions.
21 changes: 8 additions & 13 deletions app/core/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Optional, Sequence, cast

from db_client.models.dfce.family import Corpus, Family, FamilyCorpus, FamilyStatus
from db_client.models.dfce.geography import Geography
from db_client.models.organisation import Organisation
from pydantic import BaseModel
from sqlalchemy.orm import Session
Expand All @@ -17,6 +16,7 @@
SortField,
SortOrder,
)
from app.db.crud.geography import get_geo_subquery

_LOGGER = getLogger(__name__)

Expand All @@ -37,7 +37,7 @@ class BrowseArgs(BaseModel):

def to_search_response_family(
family: Family,
geography: Geography,
geography_value: str,
organisation: Organisation,
) -> SearchResponseFamily:
family_published_date = ""
Expand All @@ -56,7 +56,7 @@ def to_search_response_family(
family_date=family_published_date,
family_last_updated_date=family_last_updated_date,
family_source=cast(str, organisation.name),
family_geography=cast(str, geography.value),
family_geography=geography_value,
family_title_match=False,
family_description_match=False,
# ↓ Stuff we don't currently use for browse ↓
Expand All @@ -73,20 +73,15 @@ def browse_rds_families(
"""Browse RDS"""

t0 = perf_counter_ns()
geo_subquery = get_geo_subquery(db, req.geography_slugs, req.country_codes)
query = (
db.query(Family, Geography, Organisation)
.join(Geography, Family.geography_id == Geography.id)
db.query(Family, geo_subquery.c.value, Organisation) # type: ignore
.join(FamilyCorpus, FamilyCorpus.family_import_id == Family.import_id)
.join(Corpus, FamilyCorpus.corpus_import_id == Corpus.import_id)
.join(Organisation, Organisation.id == Corpus.organisation_id)
.filter(geo_subquery.c.family_import_id == Family.import_id) # type: ignore
)

if req.geography_slugs is not None:
query = query.filter(Geography.slug.in_(req.geography_slugs))

if req.country_codes is not None:
query = query.filter(Geography.value.in_(req.country_codes))

if req.categories is not None:
query = query.filter(Family.family_category.in_(req.categories))

Expand All @@ -98,8 +93,8 @@ def browse_rds_families(

_LOGGER.debug("Starting families query")
families = [
to_search_response_family(family, geography, organisation)
for (family, geography, organisation) in query.all()
to_search_response_family(family, geography_value, organisation)
for (family, geography_value, organisation) in query.all()
if family.family_status == FamilyStatus.PUBLISHED
]

Expand Down
237 changes: 9 additions & 228 deletions app/core/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,234 +16,15 @@
def create_query(ingest_cycle_start: str) -> str:
"""Browse RDS"""

query = (
"WITH "
"deduplicated_family_slugs as ("
"select "
"distinct ON (slug.family_import_id) "
"slug.family_import_id, slug.created, slug.name "
"from ("
"SELECT "
'slug.family_import_id as "family_import_id", '
"count(*) as count "
"from slug "
"where slug.family_import_id is not null "
"group by slug.family_import_id "
"having count(*) > 1"
") duplicates "
"left join slug "
"on duplicates.family_import_id = slug.family_import_id "
"order by slug.family_import_id desc, slug.created desc, slug.ctid desc "
"), "
"unique_family_slugs as ("
"select "
"distinct ON (slug.family_import_id) "
"slug.family_import_id, slug.created, slug.name "
"from ("
"SELECT "
'slug.family_import_id as "family_import_id", '
"count(*) as count "
"from slug "
"where slug.family_import_id is not null "
"group by slug.family_import_id "
"having count(*) = 1"
") non_duplicates "
"left join slug "
"on non_duplicates.family_import_id = slug.family_import_id "
"order by slug.family_import_id desc, slug.created desc, slug.ctid desc "
"), most_recent_family_slugs as ("
"select "
'deduplicated_family_slugs.family_import_id as "family_import_id", '
'deduplicated_family_slugs.created as "created", '
'deduplicated_family_slugs.name as "name" '
"from deduplicated_family_slugs "
"UNION ALL "
"select "
'unique_family_slugs.family_import_id as "family_import_id", '
'unique_family_slugs.created as "created", '
'unique_family_slugs.name as "name" '
"from unique_family_slugs "
"order by family_import_id desc, created desc "
"), deduplicated_doc_slugs as ("
"select "
"distinct ON (slug.family_document_import_id) "
"slug.family_document_import_id, "
"slug.created, "
"slug.name "
"from ("
"SELECT "
'slug.family_document_import_id as "family_document_import_id", '
"count(*) as count "
"from slug "
"where slug.family_document_import_id is not null "
"group by slug.family_document_import_id "
"having count(*) > 1"
") duplicates "
"left join slug "
"on duplicates.family_document_import_id = slug.family_document_import_id "
"order by "
"slug.family_document_import_id desc, slug.created desc, slug.ctid desc"
"), "
"unique_doc_slugs as ("
"select "
"distinct ON (slug.family_document_import_id) "
"slug.family_document_import_id, "
"slug.created, "
"slug.name "
"from ("
"SELECT "
'slug.family_document_import_id as "family_document_import_id", '
"count(*) as count "
"from slug "
"where slug.family_document_import_id is not null "
"group by slug.family_document_import_id "
"having count(*) = 1"
") non_duplicates "
"left join slug "
"on non_duplicates.family_document_import_id = slug.family_document_import_id "
"order by "
"slug.family_document_import_id desc, slug.created desc, slug.ctid desc"
"), most_recent_doc_slugs as ("
"select "
"deduplicated_doc_slugs.family_document_import_id "
'as "family_document_import_id", '
"deduplicated_doc_slugs.created, "
"deduplicated_doc_slugs.name "
"from deduplicated_doc_slugs "
"UNION ALL "
"select "
'unique_doc_slugs.family_document_import_id as "family_document_import_id", '
"unique_doc_slugs.created, "
"unique_doc_slugs.name "
"from unique_doc_slugs "
"order by family_document_import_id desc, created desc"
"), event_dates as ("
"select "
"family_event.family_import_id as family_import_id, "
"min(case "
"when family_event.event_type_name='Passed/Approved' then "
"family_event.date::date "
"else family_event.date::date "
"end) published_date, "
"max(family_event.date::date) last_changed "
"from family_event "
"group by family_import_id "
") "
"SELECT "
'ds.name as "Document ID", '
'p.title as "Document Title", '
'fs.name as "Family ID", '
'f.title as "Family Title", '
'f.description as "Family Summary", '
'n1.collection_titles as "Collection Title(s)", '
'n1.collection_descriptions as "Collection Description(s)", '
"INITCAP(d.valid_metadata::json#>>'{"
"role,0}') as "
'"Document Role", '
'd.variant_name as "Document Variant", '
'p.source_url as "Document Content URL", '
"INITCAP(d.valid_metadata::json#>>'{"
"type,0}') as "
'"Document Type", '
"CASE "
"WHEN f.family_category = 'UNFCCC' THEN 'UNFCCC' "
"ELSE INITCAP(f.family_category::TEXT) "
'END "Category", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'framework')), ';') "
'as "Framework", '
'n2.language as "Language", '
'o.name as "Source", '
'g.value as "Geography ISO", '
'g.display_value as "Geography", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'topic')), ';') "
'as "Topic/Response", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'hazard')), ';') "
'as "Hazard", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'sector')), ';') "
'as "Sector", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'keyword')), ';') "
'as "Keyword", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'instrument')), ';') "
'as "Instrument", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'author')), ';') "
'as "Author", '
"array_to_string(ARRAY("
"SELECT jsonb_array_elements_text(fm.value->'author_type')), ';') "
'as "Author Type", '
'fp.published_date as "First event in timeline", '
'fp.last_changed as "Last event in timeline", '
'n3.event_type_names as "Full timeline of events (types)", '
'n3.event_dates as "Full timeline of events (dates)", '
'd.created::date as "Date Added to System", '
'f.last_modified::date as "Last Modified on System", '
'd.import_id as "Internal Document ID", '
'f.import_id as "Internal Family ID", '
'n1.collection_import_ids as "Internal Collection ID(s)" '
"FROM physical_document p "
"JOIN family_document d "
" ON p.id = d.physical_document_id "
"JOIN family f "
" ON d.family_import_id = f.import_id "
"inner join geography g "
" on g.id = f.geography_id "
"join family_corpus fc "
" on f.import_id = fc.family_import_id "
"join corpus c "
" on fc.corpus_import_id = c.import_id "
"join organisation o "
" on c.organisation_id = o.id "
"join family_metadata fm "
" on fm.family_import_id = f.import_id "
"FULL JOIN ("
" SELECT "
' collection_family.family_import_id as "family_import_id", '
"string_agg(collection.import_id, ';') AS collection_import_ids, "
"string_agg(collection.title, ';') AS collection_titles, "
"string_agg(collection.description, ';') AS collection_descriptions "
"FROM "
" collection "
"INNER JOIN collection_family "
"ON collection_family.collection_import_id = collection.import_id "
"GROUP BY collection_family.family_import_id "
") n1 ON n1.family_import_id=f.import_id "
"left JOIN ("
" SELECT "
' p.id as "id", '
"string_agg(l.name, ';' ORDER BY l.name) AS language "
"FROM physical_document p "
" left join physical_document_language pdl "
" on pdl.document_id = p.id "
" left join language l "
" on l.id = pdl.language_id "
" GROUP BY p.id "
") n2 ON n2.id=d.physical_document_id "
"FULL JOIN ("
" SELECT "
" family_event.family_import_id, "
"string_agg(family_event.import_id, ';') AS event_import_ids, "
"string_agg(family_event.title, ';') AS event_titles, "
"string_agg(family_event.event_type_name, ';') AS event_type_names, "
"string_agg(family_event.date::date::text, ';') AS event_dates "
"FROM family_event "
" INNER JOIN family ON family.import_id = family_event.family_import_id "
" GROUP BY family_event.family_import_id "
") n3 ON n3.family_import_id=f.import_id "
"LEFT JOIN most_recent_doc_slugs ds "
"on ds.family_document_import_id = d.import_id "
"LEFT JOIN most_recent_family_slugs fs on fs.family_import_id = f.import_id "
"LEFT JOIN event_dates fp on fp.family_import_id = f.import_id "
f"WHERE d.last_modified < '{ingest_cycle_start}' "
"ORDER BY "
"d.last_modified desc, d.created desc, d.ctid desc, n1.family_import_id"
)
return query
# Read the download.sql file
if create_query.cache is None: # type: ignore
with open("./app/core/download.sql", "r") as file:
create_query.cache = file.read() # type: ignore

return create_query.cache.replace("{ingest_cycle_start}", ingest_cycle_start) # type: ignore


create_query.cache = None # type: ignore


def get_whole_database_dump(ingest_cycle_start: str, db=Depends(get_db)):
Expand Down
Loading

0 comments on commit 1903d3d

Please sign in to comment.