From 9325b5b3ac8dcf94b02f171c7f968b6a96edf019 Mon Sep 17 00:00:00 2001 From: Roman Isecke Date: Mon, 13 May 2024 10:44:41 -0400 Subject: [PATCH] Support file level reprocess flag --- .../ingest/v2/interfaces/file_data.py | 7 +++++++ .../ingest/v2/interfaces/upload_stager.py | 7 ++++--- .../ingest/v2/pipeline/steps/chunk.py | 11 +++++++---- .../ingest/v2/pipeline/steps/download.py | 19 ++++++++----------- .../ingest/v2/pipeline/steps/embed.py | 12 ++++++++---- .../ingest/v2/pipeline/steps/partition.py | 12 ++++++------ .../ingest/v2/pipeline/steps/stage.py | 9 +++++++-- 7 files changed, 47 insertions(+), 30 deletions(-) diff --git a/unstructured/ingest/v2/interfaces/file_data.py b/unstructured/ingest/v2/interfaces/file_data.py index a749f9fd3a..55e845360a 100644 --- a/unstructured/ingest/v2/interfaces/file_data.py +++ b/unstructured/ingest/v2/interfaces/file_data.py @@ -37,6 +37,7 @@ class FileData(DataClassJsonMixin): source_identifiers: SourceIdentifiers doc_type: IndexDocType = field(default=IndexDocType.FILE) metadata: Optional[DataSourceMetadata] = None + reprocess: bool = False @classmethod def from_file(cls, path: str) -> "FileData": @@ -47,3 +48,9 @@ def from_file(cls, path: str) -> "FileData": file_data_dict = json.load(f) file_data = FileData.from_dict(file_data_dict) return file_data + + def to_file(self, path: str) -> None: + path = Path(path).resolve() + path.parent.mkdir(parents=True, exist_ok=True) + with open(str(path.resolve()), "w") as f: + json.dump(self.to_dict(), f, indent=2) diff --git a/unstructured/ingest/v2/interfaces/upload_stager.py b/unstructured/ingest/v2/interfaces/upload_stager.py index 2bdd158940..5bf1515717 100644 --- a/unstructured/ingest/v2/interfaces/upload_stager.py +++ b/unstructured/ingest/v2/interfaces/upload_stager.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Optional, TypeVar +from unstructured.ingest.v2.interfaces.file_data import FileData from unstructured.ingest.v2.interfaces.process import BaseProcess @@ -19,8 +20,8 @@ class UploadStager(BaseProcess, ABC): upload_stager_config: Optional[config_type] = None @abstractmethod - def run(self, elements_filepath: Path, **kwargs) -> Path: + def run(self, elements_filepath: Path, file_data: FileData, **kwargs) -> Path: pass - async def run_async(self, elements_filepath: Path, **kwargs) -> Path: - return self.run(elements_filepath=elements_filepath, **kwargs) + async def run_async(self, elements_filepath: Path, file_data: FileData, **kwargs) -> Path: + return self.run(elements_filepath=elements_filepath, file_data=file_data, **kwargs) diff --git a/unstructured/ingest/v2/pipeline/steps/chunk.py b/unstructured/ingest/v2/pipeline/steps/chunk.py index 8c2c63c657..6c842558e0 100644 --- a/unstructured/ingest/v2/pipeline/steps/chunk.py +++ b/unstructured/ingest/v2/pipeline/steps/chunk.py @@ -5,6 +5,7 @@ from typing import Optional, TypedDict from unstructured.ingest.v2.chunker import Chunker +from unstructured.ingest.v2.interfaces import FileData from unstructured.ingest.v2.logging import logger from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error from unstructured.staging.base import elements_to_dicts @@ -22,8 +23,8 @@ class ChunkStep(PipelineStep): identifier: str = STEP_ID process: Chunker - def should_chunk(self, filepath: Path) -> bool: - if self.context.reprocess: + def should_chunk(self, filepath: Path, file_data: FileData) -> bool: + if self.context.reprocess or file_data.reprocess: return True if not filepath.exists(): return True @@ -43,8 +44,9 @@ def _save_output(self, output_filepath: str, chunked_content: list[dict]): @log_error() def run(self, path: str, file_data_path: str) -> ChunkStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) - if not self.should_chunk(filepath=output_filepath): + if not self.should_chunk(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping chunking, output already exists: {output_filepath}") return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath)) chunked_content_raw = self.process.run(elements_filepath=path) @@ -56,8 +58,9 @@ def run(self, path: str, file_data_path: str) -> ChunkStepResponse: async def run_async(self, path: str, file_data_path: str) -> ChunkStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) - if not self.should_chunk(filepath=output_filepath): + if not self.should_chunk(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping chunking, output already exists: {output_filepath}") return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath)) chunked_content_raw = await self.process.run_async(elements_filepath=path) diff --git a/unstructured/ingest/v2/pipeline/steps/download.py b/unstructured/ingest/v2/pipeline/steps/download.py index b12f7ad465..8a33ccdd7b 100644 --- a/unstructured/ingest/v2/pipeline/steps/download.py +++ b/unstructured/ingest/v2/pipeline/steps/download.py @@ -30,7 +30,7 @@ def is_float(value: str): except ValueError: return False - def should_download(self, file_data: FileData) -> bool: + def should_download(self, file_data: FileData, file_data_path: str) -> bool: if self.context.re_download: return True download_path = self.process.get_download_path(file_data=file_data) @@ -42,28 +42,25 @@ def should_download(self, file_data: FileData) -> bool: and self.is_float(file_data.metadata.date_modified) and download_path.stat().st_mtime > float(file_data.metadata.date_modified) ): + # Also update file data to mark this to reprocess since this won't change the filename + file_data.reprocess = True + file_data.to_file(path=file_data_path) return True return False - def get_file_data(self, path: str) -> FileData: - with open(path, "rb") as f: - file_data_dict = json.load(f) - file_data = FileData.from_dict(file_data_dict) - return file_data - def run(self, file_data_path: str) -> list[DownloadStepResponse]: - file_data = self.get_file_data(path=file_data_path) + file_data = FileData.from_file(path=file_data_path) download_path = self.process.get_download_path(file_data=file_data) - if not self.should_download(file_data=file_data): + if not self.should_download(file_data=file_data, file_data_path=file_data_path): return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))] download_path = self.process.run(file_data=file_data) return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))] async def run_async(self, file_data_path: str) -> list[DownloadStepResponse]: - file_data = self.get_file_data(path=file_data_path) + file_data = FileData.from_file(path=file_data_path) download_path = self.process.get_download_path(file_data=file_data) - if not self.should_download(file_data=file_data): + if not self.should_download(file_data=file_data, file_data_path=file_data_path): return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))] download_path = await self.process.run_async(file_data=file_data) diff --git a/unstructured/ingest/v2/pipeline/steps/embed.py b/unstructured/ingest/v2/pipeline/steps/embed.py index b160f5ef70..240d73b70e 100644 --- a/unstructured/ingest/v2/pipeline/steps/embed.py +++ b/unstructured/ingest/v2/pipeline/steps/embed.py @@ -5,6 +5,7 @@ from typing import Optional, TypedDict from unstructured.ingest.v2.embedder import Embedder +from unstructured.ingest.v2.interfaces import FileData from unstructured.ingest.v2.logging import logger from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error from unstructured.staging.base import elements_to_dicts @@ -22,8 +23,8 @@ class EmbedStep(PipelineStep): identifier: str = STEP_ID process: Embedder - def should_embed(self, filepath: Path) -> bool: - if self.context.reprocess: + def should_embed(self, filepath: Path, file_data: FileData) -> bool: + if self.context.reprocess or file_data.reprocess: return True if not filepath.exists(): return True @@ -43,8 +44,10 @@ def _save_output(self, output_filepath: str, embedded_content: list[dict]): @log_error() def run(self, path: str, file_data_path: str) -> EmbedStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) + output_filepath = self.get_output_filepath(filename=path) - if not self.should_embed(filepath=output_filepath): + if not self.should_embed(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping embedding, output already exists: {output_filepath}") return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath)) embed_content_raw = self.process.run(elements_filepath=path) @@ -56,8 +59,9 @@ def run(self, path: str, file_data_path: str) -> EmbedStepResponse: async def run_async(self, path: str, file_data_path: str) -> EmbedStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) - if not self.should_embed(filepath=output_filepath): + if not self.should_embed(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping embedding, output already exists: {output_filepath}") return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath)) embed_content_raw = await self.process.run_async(elements_filepath=path) diff --git a/unstructured/ingest/v2/pipeline/steps/partition.py b/unstructured/ingest/v2/pipeline/steps/partition.py index 52aa764286..2ffdbb81b6 100644 --- a/unstructured/ingest/v2/pipeline/steps/partition.py +++ b/unstructured/ingest/v2/pipeline/steps/partition.py @@ -22,8 +22,8 @@ class PartitionStep(PipelineStep): identifier: str = STEP_ID process: Partitioner - def should_partition(self, filepath: Path) -> bool: - if self.context.reprocess: + def should_partition(self, filepath: Path, file_data: FileData) -> bool: + if self.context.reprocess or file_data.reprocess: return True if not filepath.exists(): return True @@ -43,11 +43,11 @@ def _save_output(self, output_filepath: str, partitioned_content: list[dict]): @log_error() def run(self, path: str, file_data_path: str) -> PartitionStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) - if not self.should_partition(filepath=output_filepath): + if not self.should_partition(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping partitioning, output already exists: {output_filepath}") return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath)) - file_data = FileData.from_file(path=file_data_path) partitioned_content = self.process.run(filename=path, metadata=file_data.metadata) self._save_output( output_filepath=str(output_filepath), partitioned_content=partitioned_content @@ -56,11 +56,11 @@ def run(self, path: str, file_data_path: str) -> PartitionStepResponse: async def run_async(self, path: str, file_data_path: str) -> PartitionStepResponse: path = Path(path) + file_data = FileData.from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) - if not self.should_partition(filepath=output_filepath): + if not self.should_partition(filepath=output_filepath, file_data=file_data): logger.info(f"Skipping partitioning, output already exists: {output_filepath}") return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath)) - file_data = FileData.from_file(path=file_data_path) partitioned_content = await self.process.run_async( filename=path, metadata=file_data.metadata ) diff --git a/unstructured/ingest/v2/pipeline/steps/stage.py b/unstructured/ingest/v2/pipeline/steps/stage.py index d1455be591..47cf4b75b6 100644 --- a/unstructured/ingest/v2/pipeline/steps/stage.py +++ b/unstructured/ingest/v2/pipeline/steps/stage.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import TypedDict +from unstructured.ingest.v2.interfaces.file_data import FileData from unstructured.ingest.v2.interfaces.upload_stager import UploadStager from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error @@ -21,10 +22,14 @@ class UploadStageStep(PipelineStep): @log_error() def run(self, path: str, file_data_path: str) -> UploadStageStepResponse: path = Path(path) - staged_output_path = self.process.run(elements_filepath=path) + staged_output_path = self.process.run( + elements_filepath=path, file_data=FileData.from_file(path=file_data_path) + ) return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path)) async def run_async(self, path: str, file_data_path: str) -> UploadStageStepResponse: path = Path(path) - staged_output_path = await self.process.run_async(elements_filepath=path) + staged_output_path = await self.process.run_async( + elements_filepath=path, file_data=FileData.from_file(path=file_data_path) + ) return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))