Skip to content

Commit

Permalink
Add optional limit on connections when using asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed May 15, 2024
1 parent 29d8b72 commit cc14e48
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 27 deletions.
7 changes: 7 additions & 0 deletions unstructured/ingest/v2/interfaces/processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from asyncio import Semaphore
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
Expand All @@ -14,6 +15,7 @@ class ProcessorConfig(EnhancedDataClassJsonMixin):
verbose: bool = False
work_dir: str = field(default_factory=lambda: DEFAULT_WORK_DIR)
num_processes: int = 2
max_connections: Optional[int] = None
raise_on_error: bool = False
disable_parallelism: bool = field(
default_factory=lambda: os.getenv("INGEST_DISABLE_PARALLELISM", "false").lower() == "true"
Expand All @@ -26,3 +28,8 @@ class ProcessorConfig(EnhancedDataClassJsonMixin):

# Used to keep track of state in pipeline
status: dict = field(default_factory=dict)
semaphore: Optional[Semaphore] = field(init=False, default=None)

def __post_init__(self):
if self.max_connections is not None:
self.semaphore = Semaphore(self.max_connections)
6 changes: 1 addition & 5 deletions unstructured/ingest/v2/pipeline/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import multiprocessing as mp
from abc import ABC, abstractmethod
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional, TypeVar
Expand Down Expand Up @@ -103,10 +103,6 @@ async def run_async(self, *args, **kwargs) -> Optional[Any]:
raise e
return None

@abstractmethod
def get_hash(self, extras: Optional[list[str]]) -> str:
pass

@property
def cache_dir(self) -> Path:
return Path(self.context.work_dir) / self.identifier
6 changes: 5 additions & 1 deletion unstructured/ingest/v2/pipeline/steps/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ async def _run_async(self, path: str, file_data_path: str) -> ChunkStepResponse:
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
chunked_content_raw = await self.process.run_async(elements_filepath=path)
if semaphore := self.context.semaphore:
async with semaphore:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
else:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
self._save_output(
output_filepath=str(output_filepath),
chunked_content=elements_to_dicts(chunked_content_raw),
Expand Down
7 changes: 5 additions & 2 deletions unstructured/ingest/v2/pipeline/steps/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ async def _run_async(self, file_data_path: str) -> list[DownloadStepResponse]:
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

download_path = await self.process.run_async(file_data=file_data)
if semaphore := self.context.semaphore:
async with semaphore:
download_path = await self.process.run_async(file_data=file_data)
else:
download_path = await self.process.run_async(file_data=file_data)
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

def get_hash(self, extras: Optional[list[str]]) -> str:
Expand Down
7 changes: 6 additions & 1 deletion unstructured/ingest/v2/pipeline/steps/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ async def _run_async(self, path: str, file_data_path: str) -> EmbedStepResponse:
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
embed_content_raw = await self.process.run_async(elements_filepath=path)
if semaphore := self.context.semaphore:
async with semaphore:
embed_content_raw = await self.process.run_async(elements_filepath=path)
else:
embed_content_raw = await self.process.run_async(elements_filepath=path)

self._save_output(
output_filepath=str(output_filepath),
embedded_content=elements_to_dicts(embed_content_raw),
Expand Down
12 changes: 9 additions & 3 deletions unstructured/ingest/v2/pipeline/steps/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ async def _run_async(self, path: str, file_data_path: str) -> PartitionStepRespo
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
if semaphore := self.context.semaphore:
async with semaphore:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
else:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
)
Expand Down
12 changes: 9 additions & 3 deletions unstructured/ingest/v2/pipeline/steps/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ def _run(self, path: str, file_data_path: str) -> UploadStageStepResponse:

async def _run_async(self, path: str, file_data_path: str) -> UploadStageStepResponse:
path = Path(path)
staged_output_path = await self.process.run_async(
elements_filepath=path, file_data=FileData.from_file(path=file_data_path)
)
if semaphore := self.context.semaphore:
async with semaphore:
staged_output_path = await self.process.run_async(
elements_filepath=path, file_data=FileData.from_file(path=file_data_path)
)
else:
staged_output_path = await self.process.run_async(
elements_filepath=path, file_data=FileData.from_file(path=file_data_path)
)
return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))
11 changes: 6 additions & 5 deletions unstructured/ingest/v2/pipeline/steps/uncompress.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import TypedDict

from unstructured.ingest.v2.interfaces.file_data import FileData
from unstructured.ingest.v2.logger import logger
Expand Down Expand Up @@ -29,9 +29,6 @@ def __post_init__(self):
)
logger.info(f"Created {self.identifier} with configs: {config}")

def get_hash(self, extras: Optional[list[str]]) -> str:
pass

def _run(self, path: str, file_data_path: str) -> list[UncompressStepResponse]:
file_data = FileData.from_file(path=file_data_path)
new_file_data = self.process.run(file_data=file_data)
Expand All @@ -49,7 +46,11 @@ def _run(self, path: str, file_data_path: str) -> list[UncompressStepResponse]:

async def _run_async(self, path: str, file_data_path: str) -> list[UncompressStepResponse]:
file_data = FileData.from_file(path=file_data_path)
new_file_data = await self.process.run_async(file_data=file_data)
if semaphore := self.context.semaphore:
async with semaphore:
new_file_data = await self.process.run_async(file_data=file_data)
else:
new_file_data = await self.process.run_async(file_data=file_data)
responses = []
for new_file in new_file_data:
new_file_data_path = Path(file_data_path).parent / f"{new_file.identifier}.json"
Expand Down
17 changes: 10 additions & 7 deletions unstructured/ingest/v2/pipeline/steps/upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import TypedDict

from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.interfaces.uploader import UploadContent, Uploader
Expand Down Expand Up @@ -60,9 +60,6 @@ def __call__(self, iterable: iterable_input):
else:
self.process_whole(iterable=iterable)

def get_hash(self, extras: Optional[list[str]]) -> str:
pass

def _run(self, contents: list[UploadStepContent]):
upload_contents = [
UploadContent(path=Path(c["path"]), file_data=FileData.from_file(c["file_data_path"]))
Expand All @@ -71,6 +68,12 @@ def _run(self, contents: list[UploadStepContent]):
self.process.run(contents=upload_contents)

async def _run_async(self, path: str, file_data_path: str):
await self.process.run_async(
path=Path(path), file_data=FileData.from_file(path=file_data_path)
)
if semaphore := self.context.semaphore:
with semaphore:
await self.process.run_async(
path=Path(path), file_data=FileData.from_file(path=file_data_path)
)
else:
await self.process.run_async(
path=Path(path), file_data=FileData.from_file(path=file_data_path)
)

0 comments on commit cc14e48

Please sign in to comment.