From 29d8b729dd9a046cae05c5ad41109c6a42172ec7 Mon Sep 17 00:00:00 2001 From: Roman Isecke Date: Wed, 15 May 2024 14:51:26 -0400 Subject: [PATCH] print all errors at the end of pipeline --- unstructured/ingest/v2/cli/base/cmd.py | 5 ++- .../ingest/v2/interfaces/processor.py | 3 ++ unstructured/ingest/v2/pipeline/interfaces.py | 10 ++++-- unstructured/ingest/v2/pipeline/pipeline.py | 33 +++++++++++++++++-- .../v2/processes/connectors/fsspec/fsspec.py | 18 ++++++---- 5 files changed, 57 insertions(+), 12 deletions(-) diff --git a/unstructured/ingest/v2/cli/base/cmd.py b/unstructured/ingest/v2/cli/base/cmd.py index 28b26c11ad..089b4fa2b7 100644 --- a/unstructured/ingest/v2/cli/base/cmd.py +++ b/unstructured/ingest/v2/cli/base/cmd.py @@ -102,13 +102,16 @@ def get_processor_config(options: dict[str, Any]) -> ProcessorConfig: @staticmethod def get_indexer(src: str, options: dict[str, Any]) -> IndexerT: - print(f"options: {options}") source_entry = source_registry[src] indexer_kwargs: dict[str, Any] = {} if indexer_config_cls := source_entry.indexer_config: indexer_kwargs["index_config"] = extract_config( flat_data=options, config=indexer_config_cls ) + if connection_config_cls := source_entry.connection_config: + indexer_kwargs["connection_config"] = extract_config( + flat_data=options, config=connection_config_cls + ) indexer_cls = source_entry.indexer return indexer_cls(**indexer_kwargs) diff --git a/unstructured/ingest/v2/interfaces/processor.py b/unstructured/ingest/v2/interfaces/processor.py index 3457058882..fa95998860 100644 --- a/unstructured/ingest/v2/interfaces/processor.py +++ b/unstructured/ingest/v2/interfaces/processor.py @@ -23,3 +23,6 @@ class ProcessorConfig(EnhancedDataClassJsonMixin): max_docs: Optional[int] = None re_download: bool = False uncompress: bool = False + + # Used to keep track of state in pipeline + status: dict = field(default_factory=dict) diff --git a/unstructured/ingest/v2/pipeline/interfaces.py b/unstructured/ingest/v2/pipeline/interfaces.py index 1194086953..d32ca3d055 100644 --- a/unstructured/ingest/v2/pipeline/interfaces.py +++ b/unstructured/ingest/v2/pipeline/interfaces.py @@ -45,6 +45,8 @@ def process_multiprocess(self, iterable: iterable_input) -> Any: if iterable: if len(iterable) == 1: return [self.run(**iterable[0])] + if self.context.num_processes == 1: + return self.process_serially(iterable) with mp.Pool( processes=self.context.num_processes, initializer=self._set_log_level, @@ -83,7 +85,9 @@ def run(self, *args, **kwargs) -> Optional[Any]: try: return self._run(*args, **kwargs) except Exception as e: - logger.error("Exception raised while running pipeline", exc_info=e) + logger.error(f"Exception raised while running {self.identifier}", exc_info=e) + if "file_data_path" in kwargs: + self.context.status[kwargs["file_data_path"]] = {self.identifier: str(e)} if self.context.raise_on_error: raise e return None @@ -92,7 +96,9 @@ async def run_async(self, *args, **kwargs) -> Optional[Any]: try: return await self._run_async(*args, **kwargs) except Exception as e: - logger.error("Exception raised while running pipeline", exc_info=e) + logger.error(f"Exception raised while running {self.identifier}", exc_info=e) + if "file_data_path" in kwargs: + self.context.status[kwargs["file_data_path"]] = {self.identifier: str(e)} if self.context.raise_on_error: raise e return None diff --git a/unstructured/ingest/v2/pipeline/pipeline.py b/unstructured/ingest/v2/pipeline/pipeline.py index c726c43bef..89f5c3356c 100644 --- a/unstructured/ingest/v2/pipeline/pipeline.py +++ b/unstructured/ingest/v2/pipeline/pipeline.py @@ -1,5 +1,7 @@ import logging +import multiprocessing as mp from dataclasses import InitVar, dataclass, field +from typing import Any, Optional, Union from unstructured.ingest.v2.interfaces import ProcessorConfig from unstructured.ingest.v2.logger import logger @@ -58,14 +60,37 @@ def __post_init__( def cleanup(self): pass + def log_statuses(self): + if status := self.context.status: + logger.error(f"{len(status)} failed documents:") + for k, v in status.items(): + for kk, vv in v.items(): + logger.error(f"{k}: [{kk}] {vv}") + def run(self): try: self._run() finally: + self.log_statuses() self.cleanup() + def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optional[list[Any]]: + if not results: + return None + results = [r for r in results if r] + flat = [] + for r in results: + if isinstance(r, list): + flat.extend(r) + else: + flat.append(r) + final = [f for f in flat if f] + return final or None + def _run(self): logger.info(f"Running local pipline: {self}") + manager = mp.Manager() + self.context.status = manager.dict() # Index into data source indices = self.indexer_step.run() @@ -75,27 +100,29 @@ def _run(self): # Download associated content to local file system downloaded_data = self.downloader_step(indices_inputs) - # Flatten list of lists - downloaded_data = [x for xs in downloaded_data for x in xs] + downloaded_data = self.clean_results(results=downloaded_data) if not downloaded_data: return + # Run uncompress if available if self.uncompress_step: downloaded_data = self.uncompress_step(downloaded_data) # Flatten list of lists - downloaded_data = [x for xs in downloaded_data for x in xs] + downloaded_data = self.clean_results(results=downloaded_data) if not downloaded_data: return # Partition content elements = self.partitioner_step(downloaded_data) + elements = self.clean_results(results=elements) if not elements: return # Run element specific modifiers for step in [self.chunker_step, self.embedder_step, self.stager_step]: elements = step(elements) if step else elements + elements = self.clean_results(results=elements) if not elements: return diff --git a/unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py b/unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py index 0777a5bc34..22e798ec52 100644 --- a/unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py +++ b/unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py @@ -2,7 +2,7 @@ import json import os from contextlib import suppress -from dataclasses import InitVar, dataclass, field +from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from time import time @@ -32,9 +32,14 @@ CONNECTOR_TYPE = "fsspec" +class Base(object): + def __post_init__(self): + pass + + @dataclass -class FileConfig: - remote_url: InitVar[str] +class FileConfig(Base): + remote_url: str protocol: str = field(init=False) path_without_protocol: str = field(init=False) supported_protocols: list[str] = field( @@ -51,8 +56,9 @@ class FileConfig: ] ) - def __post_init__(self, remote_url: str): - self.protocol, self.path_without_protocol = remote_url.split("://") + def __post_init__(self): + super().__post_init__() + self.protocol, self.path_without_protocol = self.remote_url.split("://") if self.protocol not in self.supported_protocols: raise ValueError( "Protocol {} not supported yet, only {} are supported.".format( @@ -204,7 +210,7 @@ def __post_init__(self): ) def get_download_path(self, file_data: FileData) -> Path: - return self.download_config.download_dir / file_data.source_identifiers.rel_path + return self.download_config.download_dir / Path(file_data.source_identifiers.rel_path) @staticmethod def is_float(value: str):