Skip to content

Commit

Permalink
Support file level reprocess flag
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed May 13, 2024
1 parent 77a4bfc commit 9325b5b
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 30 deletions.
7 changes: 7 additions & 0 deletions unstructured/ingest/v2/interfaces/file_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FileData(DataClassJsonMixin):
source_identifiers: SourceIdentifiers
doc_type: IndexDocType = field(default=IndexDocType.FILE)
metadata: Optional[DataSourceMetadata] = None
reprocess: bool = False

@classmethod
def from_file(cls, path: str) -> "FileData":
Expand All @@ -47,3 +48,9 @@ def from_file(cls, path: str) -> "FileData":
file_data_dict = json.load(f)
file_data = FileData.from_dict(file_data_dict)
return file_data

def to_file(self, path: str) -> None:
path = Path(path).resolve()
path.parent.mkdir(parents=True, exist_ok=True)
with open(str(path.resolve()), "w") as f:
json.dump(self.to_dict(), f, indent=2)
7 changes: 4 additions & 3 deletions unstructured/ingest/v2/interfaces/upload_stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Optional, TypeVar

from unstructured.ingest.v2.interfaces.file_data import FileData
from unstructured.ingest.v2.interfaces.process import BaseProcess


Expand All @@ -19,8 +20,8 @@ class UploadStager(BaseProcess, ABC):
upload_stager_config: Optional[config_type] = None

@abstractmethod
def run(self, elements_filepath: Path, **kwargs) -> Path:
def run(self, elements_filepath: Path, file_data: FileData, **kwargs) -> Path:
pass

async def run_async(self, elements_filepath: Path, **kwargs) -> Path:
return self.run(elements_filepath=elements_filepath, **kwargs)
async def run_async(self, elements_filepath: Path, file_data: FileData, **kwargs) -> Path:
return self.run(elements_filepath=elements_filepath, file_data=file_data, **kwargs)
11 changes: 7 additions & 4 deletions unstructured/ingest/v2/pipeline/steps/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, TypedDict

from unstructured.ingest.v2.chunker import Chunker
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logging import logger
from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error
from unstructured.staging.base import elements_to_dicts
Expand All @@ -22,8 +23,8 @@ class ChunkStep(PipelineStep):
identifier: str = STEP_ID
process: Chunker

def should_chunk(self, filepath: Path) -> bool:
if self.context.reprocess:
def should_chunk(self, filepath: Path, file_data: FileData) -> bool:
if self.context.reprocess or file_data.reprocess:
return True
if not filepath.exists():
return True
Expand All @@ -43,8 +44,9 @@ def _save_output(self, output_filepath: str, chunked_content: list[dict]):
@log_error()
def run(self, path: str, file_data_path: str) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath):
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
chunked_content_raw = self.process.run(elements_filepath=path)
Expand All @@ -56,8 +58,9 @@ def run(self, path: str, file_data_path: str) -> ChunkStepResponse:

async def run_async(self, path: str, file_data_path: str) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath):
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
chunked_content_raw = await self.process.run_async(elements_filepath=path)
Expand Down
19 changes: 8 additions & 11 deletions unstructured/ingest/v2/pipeline/steps/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def is_float(value: str):
except ValueError:
return False

def should_download(self, file_data: FileData) -> bool:
def should_download(self, file_data: FileData, file_data_path: str) -> bool:
if self.context.re_download:
return True
download_path = self.process.get_download_path(file_data=file_data)
Expand All @@ -42,28 +42,25 @@ def should_download(self, file_data: FileData) -> bool:
and self.is_float(file_data.metadata.date_modified)
and download_path.stat().st_mtime > float(file_data.metadata.date_modified)
):
# Also update file data to mark this to reprocess since this won't change the filename
file_data.reprocess = True
file_data.to_file(path=file_data_path)
return True
return False

def get_file_data(self, path: str) -> FileData:
with open(path, "rb") as f:
file_data_dict = json.load(f)
file_data = FileData.from_dict(file_data_dict)
return file_data

def run(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = self.get_file_data(path=file_data_path)
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data):
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

download_path = self.process.run(file_data=file_data)
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

async def run_async(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = self.get_file_data(path=file_data_path)
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data):
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

download_path = await self.process.run_async(file_data=file_data)
Expand Down
12 changes: 8 additions & 4 deletions unstructured/ingest/v2/pipeline/steps/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, TypedDict

from unstructured.ingest.v2.embedder import Embedder
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logging import logger
from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error
from unstructured.staging.base import elements_to_dicts
Expand All @@ -22,8 +23,8 @@ class EmbedStep(PipelineStep):
identifier: str = STEP_ID
process: Embedder

def should_embed(self, filepath: Path) -> bool:
if self.context.reprocess:
def should_embed(self, filepath: Path, file_data: FileData) -> bool:
if self.context.reprocess or file_data.reprocess:
return True
if not filepath.exists():
return True
Expand All @@ -43,8 +44,10 @@ def _save_output(self, output_filepath: str, embedded_content: list[dict]):
@log_error()
def run(self, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)

output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath):
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
embed_content_raw = self.process.run(elements_filepath=path)
Expand All @@ -56,8 +59,9 @@ def run(self, path: str, file_data_path: str) -> EmbedStepResponse:

async def run_async(self, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath):
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
embed_content_raw = await self.process.run_async(elements_filepath=path)
Expand Down
12 changes: 6 additions & 6 deletions unstructured/ingest/v2/pipeline/steps/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class PartitionStep(PipelineStep):
identifier: str = STEP_ID
process: Partitioner

def should_partition(self, filepath: Path) -> bool:
if self.context.reprocess:
def should_partition(self, filepath: Path, file_data: FileData) -> bool:
if self.context.reprocess or file_data.reprocess:
return True
if not filepath.exists():
return True
Expand All @@ -43,11 +43,11 @@ def _save_output(self, output_filepath: str, partitioned_content: list[dict]):
@log_error()
def run(self, path: str, file_data_path: str) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_partition(filepath=output_filepath):
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
file_data = FileData.from_file(path=file_data_path)
partitioned_content = self.process.run(filename=path, metadata=file_data.metadata)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
Expand All @@ -56,11 +56,11 @@ def run(self, path: str, file_data_path: str) -> PartitionStepResponse:

async def run_async(self, path: str, file_data_path: str) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_partition(filepath=output_filepath):
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.info(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
file_data = FileData.from_file(path=file_data_path)
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
Expand Down
9 changes: 7 additions & 2 deletions unstructured/ingest/v2/pipeline/steps/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from typing import TypedDict

from unstructured.ingest.v2.interfaces.file_data import FileData
from unstructured.ingest.v2.interfaces.upload_stager import UploadStager
from unstructured.ingest.v2.pipeline.interfaces import PipelineStep, log_error

Expand All @@ -21,10 +22,14 @@ class UploadStageStep(PipelineStep):
@log_error()
def run(self, path: str, file_data_path: str) -> UploadStageStepResponse:
path = Path(path)
staged_output_path = self.process.run(elements_filepath=path)
staged_output_path = self.process.run(
elements_filepath=path, file_data=FileData.from_file(path=file_data_path)
)
return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))

async def run_async(self, path: str, file_data_path: str) -> UploadStageStepResponse:
path = Path(path)
staged_output_path = await self.process.run_async(elements_filepath=path)
staged_output_path = await self.process.run_async(
elements_filepath=path, file_data=FileData.from_file(path=file_data_path)
)
return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))

0 comments on commit 9325b5b

Please sign in to comment.