diff --git a/app/core/ingestion/pipeline.py b/app/core/ingestion/pipeline.py index 4c793573..a8d9db8b 100644 --- a/app/core/ingestion/pipeline.py +++ b/app/core/ingestion/pipeline.py @@ -24,65 +24,6 @@ MetadataType = dict[str, list[str]] -def fetch_family_and_metadata( - db: Session, -) -> Sequence[Tuple[Family, FamilyMetadata, Organisation, Corpus]]: - """Fetch distinct family & family metadata information from the db. - - This function queries the database to retrieve a list of families - along with their associated metadata, organisation, and corpus info. - It ensures that only distinct combinations of these entities are - returned. - - :param Session db: The db session to query against. - :return Sequence[ - Tuple[Family, FamilyMetadata, Organisation, Corpus] - ]: A list of tuples, each containing a Family, FamilyMetadata, - Organisation, and Corpus object. - """ - _LOGGER.info("Running pipeline family query") - return ( - db.query( - Family.import_id.label("family_import_id"), - Family, - FamilyMetadata, - Organisation, - Corpus, - ) - .join(FamilyMetadata, Family.import_id == FamilyMetadata.family_import_id) - .join(FamilyCorpus, Family.import_id == FamilyCorpus.family_import_id) - .join(Corpus, Corpus.import_id == FamilyCorpus.corpus_import_id) - .join(Organisation, Corpus.organisation_id == Organisation.id) - .subquery() - ) - - -def fetch_documents(db: Session) -> Sequence[Tuple[FamilyDocument, PhysicalDocument]]: - """Fetch non-deleted documents and their associated physical docs. - - This function queries the database to retrieve a list of family - documents that are not marked as deleted and any associated physical - documents. Only distinct combinations are returned. - - :param Session db: The db session to query against. - :return Sequence[Tuple[FamilyDocument, PhysicalDocument]]: A list of - tuples, each containing a FamilyDocument and PhysicalDocument. - """ - _LOGGER.info("Running pipeline document query") - return ( - db.query( - FamilyDocument.import_id.label("doc_import_id"), - FamilyDocument, - PhysicalDocument, - ) - .join( - PhysicalDocument, PhysicalDocument.id == FamilyDocument.physical_document_id - ) - .filter(FamilyDocument.document_status != DocumentStatus.DELETED) - .subquery() - ) - - def fetch_geographies(db: Session) -> Sequence[Tuple[str, list[str]]]: """Fetch unique geographies associated with each family. @@ -111,13 +52,13 @@ def generate_pipeline_ingest_input_query( db: Session, ) -> Sequence[ Tuple[ - FamilyDocument, Family, + FamilyDocument, FamilyMetadata, + list[str], Organisation, Corpus, PhysicalDocument, - list[str], ] ]: """Get a list of non-deleted docs and their associated meta & geos. @@ -150,13 +91,13 @@ def generate_pipeline_ingest_input_query( # Main query query = ( db.query( - FamilyDocument, Family, + FamilyDocument, FamilyMetadata, + geography_subquery.c.geographies, # type: ignore Organisation, Corpus, PhysicalDocument, - geography_subquery.c.geographies, # type: ignore ) .select_from(FamilyDocument) .join(Family, Family.import_id == FamilyDocument.family_import_id) @@ -177,7 +118,6 @@ def generate_pipeline_ingest_input_query( lazyload("*") ) ) - print(query) results = query.all() return results @@ -236,13 +176,13 @@ def generate_pipeline_ingest_input(db: Session) -> Sequence[DocumentParserInput] ), ) for ( - family_document, family, + family_document, family_metadata, + geographies, organisation, corpus, physical_document, - geographies, ) in results ]