Skip to content

Commit

Permalink
Make ordering of db objects the same
Browse files Browse the repository at this point in the history
  • Loading branch information
katybaulch committed Oct 17, 2024
1 parent 4322037 commit 0b0a8ce
Showing 1 changed file with 6 additions and 66 deletions.
72 changes: 6 additions & 66 deletions app/core/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,65 +24,6 @@
MetadataType = dict[str, list[str]]


def fetch_family_and_metadata(
db: Session,
) -> Sequence[Tuple[Family, FamilyMetadata, Organisation, Corpus]]:
"""Fetch distinct family & family metadata information from the db.
This function queries the database to retrieve a list of families
along with their associated metadata, organisation, and corpus info.
It ensures that only distinct combinations of these entities are
returned.
:param Session db: The db session to query against.
:return Sequence[
Tuple[Family, FamilyMetadata, Organisation, Corpus]
]: A list of tuples, each containing a Family, FamilyMetadata,
Organisation, and Corpus object.
"""
_LOGGER.info("Running pipeline family query")
return (
db.query(
Family.import_id.label("family_import_id"),
Family,
FamilyMetadata,
Organisation,
Corpus,
)
.join(FamilyMetadata, Family.import_id == FamilyMetadata.family_import_id)
.join(FamilyCorpus, Family.import_id == FamilyCorpus.family_import_id)
.join(Corpus, Corpus.import_id == FamilyCorpus.corpus_import_id)
.join(Organisation, Corpus.organisation_id == Organisation.id)
.subquery()
)


def fetch_documents(db: Session) -> Sequence[Tuple[FamilyDocument, PhysicalDocument]]:
"""Fetch non-deleted documents and their associated physical docs.
This function queries the database to retrieve a list of family
documents that are not marked as deleted and any associated physical
documents. Only distinct combinations are returned.
:param Session db: The db session to query against.
:return Sequence[Tuple[FamilyDocument, PhysicalDocument]]: A list of
tuples, each containing a FamilyDocument and PhysicalDocument.
"""
_LOGGER.info("Running pipeline document query")
return (
db.query(
FamilyDocument.import_id.label("doc_import_id"),
FamilyDocument,
PhysicalDocument,
)
.join(
PhysicalDocument, PhysicalDocument.id == FamilyDocument.physical_document_id
)
.filter(FamilyDocument.document_status != DocumentStatus.DELETED)
.subquery()
)


def fetch_geographies(db: Session) -> Sequence[Tuple[str, list[str]]]:
"""Fetch unique geographies associated with each family.
Expand Down Expand Up @@ -111,13 +52,13 @@ def generate_pipeline_ingest_input_query(
db: Session,
) -> Sequence[
Tuple[
FamilyDocument,
Family,
FamilyDocument,
FamilyMetadata,
list[str],
Organisation,
Corpus,
PhysicalDocument,
list[str],
]
]:
"""Get a list of non-deleted docs and their associated meta & geos.
Expand Down Expand Up @@ -150,13 +91,13 @@ def generate_pipeline_ingest_input_query(
# Main query
query = (
db.query(
FamilyDocument,
Family,
FamilyDocument,
FamilyMetadata,
geography_subquery.c.geographies, # type: ignore
Organisation,
Corpus,
PhysicalDocument,
geography_subquery.c.geographies, # type: ignore
)
.select_from(FamilyDocument)
.join(Family, Family.import_id == FamilyDocument.family_import_id)
Expand All @@ -177,7 +118,6 @@ def generate_pipeline_ingest_input_query(
lazyload("*")
)
)
print(query)
results = query.all()
return results

Expand Down Expand Up @@ -236,13 +176,13 @@ def generate_pipeline_ingest_input(db: Session) -> Sequence[DocumentParserInput]
),
)
for (
family_document,
family,
family_document,
family_metadata,
geographies,
organisation,
corpus,
physical_document,
geographies,
) in results
]

Expand Down

0 comments on commit 0b0a8ce

Please sign in to comment.