Skip to content

Commit

Permalink
Add local destination as default
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed May 13, 2024
1 parent 9325b5b commit 29dc0a7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 34 deletions.
4 changes: 2 additions & 2 deletions unstructured/ingest/v2/connectors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def check_connection(self):

@dataclass
class LocalUploaderConfig(UploaderConfig):
output_directory: str
output_directory: str = field(default="structured-output")

@property
def output_path(self) -> Path:
Expand All @@ -119,7 +119,7 @@ def __post_init__(self):

@dataclass
class LocalUploader(Uploader):
upload_config: LocalUploaderConfig
upload_config: LocalUploaderConfig = field(default_factory=LocalUploaderConfig)

def is_async(self) -> bool:
return False
Expand Down
4 changes: 2 additions & 2 deletions unstructured/ingest/v2/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
LocalUploaderConfig,
)
from unstructured.ingest.v2.embedder import Embedder, EmbedderConfig
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.partitioner import Partitioner, PartitionerConfig
from unstructured.ingest.v2.pipeline.context import PipelineContext
from unstructured.ingest.v2.pipeline.pipeline import Pipeline

base_path = Path(__file__).parent.parent.parent.parent
Expand All @@ -37,7 +37,7 @@
)
)
pipeline = Pipeline(
context=PipelineContext(work_dir=str(work_dir.resolve())),
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer=source.indexer,
downloader=source.downloader,
partitioner=partitioner,
Expand Down
1 change: 0 additions & 1 deletion unstructured/ingest/v2/interfaces/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class ProcessorConfig:
(Path.home() / ".cache" / "unstructured" / "ingest" / "pipeline").resolve()
)
)
output_dir: str = "structured-output"
num_processes: int = 2
raise_on_error: bool = False
disable_parallelism: bool = field(
Expand Down
20 changes: 0 additions & 20 deletions unstructured/ingest/v2/pipeline/context.py

This file was deleted.

5 changes: 2 additions & 3 deletions unstructured/ingest/v2/pipeline/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar

from unstructured.ingest.v2.interfaces import BaseProcess
from unstructured.ingest.v2.interfaces import BaseProcess, ProcessorConfig
from unstructured.ingest.v2.logging import logger
from unstructured.ingest.v2.pipeline.context import PipelineContext

process_type = TypeVar("process_type", bound=BaseProcess)
iterable_input = list[dict[str, Any]]
Expand All @@ -17,7 +16,7 @@
class PipelineStep(ABC):
identifier: str
process: process_type
context: PipelineContext
context: ProcessorConfig

def process_serially(self, iterable: iterable_input) -> Any:
logger.info("processing content serially")
Expand Down
41 changes: 35 additions & 6 deletions unstructured/ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import multiprocessing as mp
from dataclasses import InitVar, dataclass, field

from unstructured.ingest.v2.pipeline.context import PipelineContext
from unstructured.ingest.v2.connectors.local import LocalUploader
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logging import logger
from unstructured.ingest.v2.pipeline.steps.chunk import Chunker, ChunkStep
from unstructured.ingest.v2.pipeline.steps.download import DownloadStep, download_type
from unstructured.ingest.v2.pipeline.steps.embed import Embedder, EmbedStep
Expand All @@ -13,7 +14,7 @@

@dataclass
class Pipeline:
context: PipelineContext
context: ProcessorConfig
indexer: InitVar[index_type]
indexer_step: IndexStep = field(init=False)
downloader: InitVar[download_type]
Expand All @@ -26,7 +27,7 @@ class Pipeline:
embedder_step: EmbedStep = field(init=False, default=None)
stager: InitVar[UploadStager] = None
stager_step: UploadStageStep = field(init=False, default=None)
uploader: InitVar[Uploader] = None
uploader: InitVar[Uploader] = field(default=LocalUploader)
uploader_step: UploadStep = field(init=False, default=None)

def __post_init__(
Expand All @@ -47,9 +48,36 @@ def __post_init__(
self.stager_step = UploadStageStep(process=stager, context=self.context) if stager else None
self.uploader_step = UploadStep(process=uploader, context=self.context)

def cleanup(self):
pass

def __str__(self):
s = []
s.append(f"{self.indexer_step.identifier} ({self.indexer_step.process.__class__.__name__})")
s.append(
f"{self.downloader_step.identifier} ({self.downloader_step.process.__class__.__name__})"
)
s.append(f"{self.partitioner_step.identifier}")
if self.chunker_step:
s.append(
f"{self.chunker_step.identifier} "
f"({self.chunker_step.process.config.chunking_strategy})"
)
if self.embedder_step:
s.append(
f"{self.embedder_step.identifier} ({self.embedder_step.process.config.provider})"
)
if self.stager_step:
s.append(
f"{self.stager_step.identifier} ({self.stager_step.process.__class__.__name__})"
)
s.append(
f"{self.uploader_step.identifier} ({self.uploader_step.process.__class__.__name__})"
)
return " -> ".join(s)

def run(self):
manager = mp.Manager()
self.context.statuses = manager.dict()
logger.info(f"Running local pipline: {self}")
indices = self.indexer_step.run()
indies_inputs = [{"file_data_path": i} for i in indices]
downloaded_data = self.downloader_step(indies_inputs)
Expand All @@ -64,3 +92,4 @@ def run(self):
elements = self.stager_step(elements)

self.uploader_step.run(contents=elements)
self.cleanup()

0 comments on commit 29dc0a7

Please sign in to comment.