From 29dc0a7273bd829d06d1bd8db66f334668fe0ed4 Mon Sep 17 00:00:00 2001 From: Roman Isecke Date: Mon, 13 May 2024 11:17:16 -0400 Subject: [PATCH] Add local destination as default --- unstructured/ingest/v2/connectors/local.py | 4 +- unstructured/ingest/v2/example.py | 4 +- .../ingest/v2/interfaces/processor.py | 1 - unstructured/ingest/v2/pipeline/context.py | 20 --------- unstructured/ingest/v2/pipeline/interfaces.py | 5 +-- unstructured/ingest/v2/pipeline/pipeline.py | 41 ++++++++++++++++--- 6 files changed, 41 insertions(+), 34 deletions(-) delete mode 100644 unstructured/ingest/v2/pipeline/context.py diff --git a/unstructured/ingest/v2/connectors/local.py b/unstructured/ingest/v2/connectors/local.py index 7042467498..d6473409a3 100644 --- a/unstructured/ingest/v2/connectors/local.py +++ b/unstructured/ingest/v2/connectors/local.py @@ -106,7 +106,7 @@ def check_connection(self): @dataclass class LocalUploaderConfig(UploaderConfig): - output_directory: str + output_directory: str = field(default="structured-output") @property def output_path(self) -> Path: @@ -119,7 +119,7 @@ def __post_init__(self): @dataclass class LocalUploader(Uploader): - upload_config: LocalUploaderConfig + upload_config: LocalUploaderConfig = field(default_factory=LocalUploaderConfig) def is_async(self) -> bool: return False diff --git a/unstructured/ingest/v2/example.py b/unstructured/ingest/v2/example.py index 34234f82fe..3dc87a9ce7 100644 --- a/unstructured/ingest/v2/example.py +++ b/unstructured/ingest/v2/example.py @@ -10,8 +10,8 @@ LocalUploaderConfig, ) from unstructured.ingest.v2.embedder import Embedder, EmbedderConfig +from unstructured.ingest.v2.interfaces import ProcessorConfig from unstructured.ingest.v2.partitioner import Partitioner, PartitionerConfig -from unstructured.ingest.v2.pipeline.context import PipelineContext from unstructured.ingest.v2.pipeline.pipeline import Pipeline base_path = Path(__file__).parent.parent.parent.parent @@ -37,7 +37,7 @@ ) ) pipeline = Pipeline( - context=PipelineContext(work_dir=str(work_dir.resolve())), + context=ProcessorConfig(work_dir=str(work_dir.resolve())), indexer=source.indexer, downloader=source.downloader, partitioner=partitioner, diff --git a/unstructured/ingest/v2/interfaces/processor.py b/unstructured/ingest/v2/interfaces/processor.py index ab22a9ef70..c18c09569a 100644 --- a/unstructured/ingest/v2/interfaces/processor.py +++ b/unstructured/ingest/v2/interfaces/processor.py @@ -13,7 +13,6 @@ class ProcessorConfig: (Path.home() / ".cache" / "unstructured" / "ingest" / "pipeline").resolve() ) ) - output_dir: str = "structured-output" num_processes: int = 2 raise_on_error: bool = False disable_parallelism: bool = field( diff --git a/unstructured/ingest/v2/pipeline/context.py b/unstructured/ingest/v2/pipeline/context.py deleted file mode 100644 index 4159afca9b..0000000000 --- a/unstructured/ingest/v2/pipeline/context.py +++ /dev/null @@ -1,20 +0,0 @@ -from dataclasses import dataclass, field -from multiprocessing.managers import DictProxy -from typing import Optional - -from unstructured.ingest.v2.interfaces import ProcessorConfig - - -@dataclass -class PipelineContext(ProcessorConfig): - _statuses: Optional[DictProxy] = field(init=False, default=None) - - @property - def statuses(self) -> DictProxy: - if self._statuses is None: - raise ValueError("statuses never initialized") - return self._statuses - - @statuses.setter - def statuses(self, value: DictProxy): - self._statuses = value diff --git a/unstructured/ingest/v2/pipeline/interfaces.py b/unstructured/ingest/v2/pipeline/interfaces.py index 4625cd38ee..5a7654b39f 100644 --- a/unstructured/ingest/v2/pipeline/interfaces.py +++ b/unstructured/ingest/v2/pipeline/interfaces.py @@ -5,9 +5,8 @@ from pathlib import Path from typing import Any, Callable, Optional, TypeVar -from unstructured.ingest.v2.interfaces import BaseProcess +from unstructured.ingest.v2.interfaces import BaseProcess, ProcessorConfig from unstructured.ingest.v2.logging import logger -from unstructured.ingest.v2.pipeline.context import PipelineContext process_type = TypeVar("process_type", bound=BaseProcess) iterable_input = list[dict[str, Any]] @@ -17,7 +16,7 @@ class PipelineStep(ABC): identifier: str process: process_type - context: PipelineContext + context: ProcessorConfig def process_serially(self, iterable: iterable_input) -> Any: logger.info("processing content serially") diff --git a/unstructured/ingest/v2/pipeline/pipeline.py b/unstructured/ingest/v2/pipeline/pipeline.py index 5b0f46f085..715d235425 100644 --- a/unstructured/ingest/v2/pipeline/pipeline.py +++ b/unstructured/ingest/v2/pipeline/pipeline.py @@ -1,7 +1,8 @@ -import multiprocessing as mp from dataclasses import InitVar, dataclass, field -from unstructured.ingest.v2.pipeline.context import PipelineContext +from unstructured.ingest.v2.connectors.local import LocalUploader +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.logging import logger from unstructured.ingest.v2.pipeline.steps.chunk import Chunker, ChunkStep from unstructured.ingest.v2.pipeline.steps.download import DownloadStep, download_type from unstructured.ingest.v2.pipeline.steps.embed import Embedder, EmbedStep @@ -13,7 +14,7 @@ @dataclass class Pipeline: - context: PipelineContext + context: ProcessorConfig indexer: InitVar[index_type] indexer_step: IndexStep = field(init=False) downloader: InitVar[download_type] @@ -26,7 +27,7 @@ class Pipeline: embedder_step: EmbedStep = field(init=False, default=None) stager: InitVar[UploadStager] = None stager_step: UploadStageStep = field(init=False, default=None) - uploader: InitVar[Uploader] = None + uploader: InitVar[Uploader] = field(default=LocalUploader) uploader_step: UploadStep = field(init=False, default=None) def __post_init__( @@ -47,9 +48,36 @@ def __post_init__( self.stager_step = UploadStageStep(process=stager, context=self.context) if stager else None self.uploader_step = UploadStep(process=uploader, context=self.context) + def cleanup(self): + pass + + def __str__(self): + s = [] + s.append(f"{self.indexer_step.identifier} ({self.indexer_step.process.__class__.__name__})") + s.append( + f"{self.downloader_step.identifier} ({self.downloader_step.process.__class__.__name__})" + ) + s.append(f"{self.partitioner_step.identifier}") + if self.chunker_step: + s.append( + f"{self.chunker_step.identifier} " + f"({self.chunker_step.process.config.chunking_strategy})" + ) + if self.embedder_step: + s.append( + f"{self.embedder_step.identifier} ({self.embedder_step.process.config.provider})" + ) + if self.stager_step: + s.append( + f"{self.stager_step.identifier} ({self.stager_step.process.__class__.__name__})" + ) + s.append( + f"{self.uploader_step.identifier} ({self.uploader_step.process.__class__.__name__})" + ) + return " -> ".join(s) + def run(self): - manager = mp.Manager() - self.context.statuses = manager.dict() + logger.info(f"Running local pipline: {self}") indices = self.indexer_step.run() indies_inputs = [{"file_data_path": i} for i in indices] downloaded_data = self.downloader_step(indies_inputs) @@ -64,3 +92,4 @@ def run(self): elements = self.stager_step(elements) self.uploader_step.run(contents=elements) + self.cleanup()