Skip to content

Commit

Permalink
print all errors at the end of pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed May 15, 2024
1 parent 904dea9 commit 29d8b72
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 12 deletions.
5 changes: 4 additions & 1 deletion unstructured/ingest/v2/cli/base/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ def get_processor_config(options: dict[str, Any]) -> ProcessorConfig:

@staticmethod
def get_indexer(src: str, options: dict[str, Any]) -> IndexerT:
print(f"options: {options}")
source_entry = source_registry[src]
indexer_kwargs: dict[str, Any] = {}
if indexer_config_cls := source_entry.indexer_config:
indexer_kwargs["index_config"] = extract_config(
flat_data=options, config=indexer_config_cls
)
if connection_config_cls := source_entry.connection_config:
indexer_kwargs["connection_config"] = extract_config(
flat_data=options, config=connection_config_cls
)
indexer_cls = source_entry.indexer
return indexer_cls(**indexer_kwargs)

Expand Down
3 changes: 3 additions & 0 deletions unstructured/ingest/v2/interfaces/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ class ProcessorConfig(EnhancedDataClassJsonMixin):
max_docs: Optional[int] = None
re_download: bool = False
uncompress: bool = False

# Used to keep track of state in pipeline
status: dict = field(default_factory=dict)
10 changes: 8 additions & 2 deletions unstructured/ingest/v2/pipeline/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def process_multiprocess(self, iterable: iterable_input) -> Any:
if iterable:
if len(iterable) == 1:
return [self.run(**iterable[0])]
if self.context.num_processes == 1:
return self.process_serially(iterable)
with mp.Pool(
processes=self.context.num_processes,
initializer=self._set_log_level,
Expand Down Expand Up @@ -83,7 +85,9 @@ def run(self, *args, **kwargs) -> Optional[Any]:
try:
return self._run(*args, **kwargs)
except Exception as e:
logger.error("Exception raised while running pipeline", exc_info=e)
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:
self.context.status[kwargs["file_data_path"]] = {self.identifier: str(e)}
if self.context.raise_on_error:
raise e
return None
Expand All @@ -92,7 +96,9 @@ async def run_async(self, *args, **kwargs) -> Optional[Any]:
try:
return await self._run_async(*args, **kwargs)
except Exception as e:
logger.error("Exception raised while running pipeline", exc_info=e)
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:
self.context.status[kwargs["file_data_path"]] = {self.identifier: str(e)}
if self.context.raise_on_error:
raise e
return None
Expand Down
33 changes: 30 additions & 3 deletions unstructured/ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import multiprocessing as mp
from dataclasses import InitVar, dataclass, field
from typing import Any, Optional, Union

from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
Expand Down Expand Up @@ -58,14 +60,37 @@ def __post_init__(
def cleanup(self):
pass

def log_statuses(self):
if status := self.context.status:
logger.error(f"{len(status)} failed documents:")
for k, v in status.items():
for kk, vv in v.items():
logger.error(f"{k}: [{kk}] {vv}")

def run(self):
try:
self._run()
finally:
self.log_statuses()
self.cleanup()

def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optional[list[Any]]:
if not results:
return None
results = [r for r in results if r]
flat = []
for r in results:
if isinstance(r, list):
flat.extend(r)
else:
flat.append(r)
final = [f for f in flat if f]
return final or None

def _run(self):
logger.info(f"Running local pipline: {self}")
manager = mp.Manager()
self.context.status = manager.dict()

# Index into data source
indices = self.indexer_step.run()
Expand All @@ -75,27 +100,29 @@ def _run(self):

# Download associated content to local file system
downloaded_data = self.downloader_step(indices_inputs)
# Flatten list of lists
downloaded_data = [x for xs in downloaded_data for x in xs]
downloaded_data = self.clean_results(results=downloaded_data)
if not downloaded_data:
return

# Run uncompress if available
if self.uncompress_step:
downloaded_data = self.uncompress_step(downloaded_data)
# Flatten list of lists
downloaded_data = [x for xs in downloaded_data for x in xs]
downloaded_data = self.clean_results(results=downloaded_data)

if not downloaded_data:
return

# Partition content
elements = self.partitioner_step(downloaded_data)
elements = self.clean_results(results=elements)
if not elements:
return

# Run element specific modifiers
for step in [self.chunker_step, self.embedder_step, self.stager_step]:
elements = step(elements) if step else elements
elements = self.clean_results(results=elements)
if not elements:
return

Expand Down
18 changes: 12 additions & 6 deletions unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import os
from contextlib import suppress
from dataclasses import InitVar, dataclass, field
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from time import time
Expand Down Expand Up @@ -32,9 +32,14 @@
CONNECTOR_TYPE = "fsspec"


class Base(object):
def __post_init__(self):
pass


@dataclass
class FileConfig:
remote_url: InitVar[str]
class FileConfig(Base):
remote_url: str
protocol: str = field(init=False)
path_without_protocol: str = field(init=False)
supported_protocols: list[str] = field(
Expand All @@ -51,8 +56,9 @@ class FileConfig:
]
)

def __post_init__(self, remote_url: str):
self.protocol, self.path_without_protocol = remote_url.split("://")
def __post_init__(self):
super().__post_init__()
self.protocol, self.path_without_protocol = self.remote_url.split("://")
if self.protocol not in self.supported_protocols:
raise ValueError(
"Protocol {} not supported yet, only {} are supported.".format(
Expand Down Expand Up @@ -204,7 +210,7 @@ def __post_init__(self):
)

def get_download_path(self, file_data: FileData) -> Path:
return self.download_config.download_dir / file_data.source_identifiers.rel_path
return self.download_config.download_dir / Path(file_data.source_identifiers.rel_path)

@staticmethod
def is_float(value: str):
Expand Down

0 comments on commit 29d8b72

Please sign in to comment.