From eaf6746d2eca3c25bce033022b06e45d434d683e Mon Sep 17 00:00:00 2001 From: Roman Isecke Date: Wed, 4 Oct 2023 17:22:22 -0400 Subject: [PATCH] lint fixes --- unstructured/ingest/cli/cmds/airtable.py | 2 +- unstructured/ingest/cli/cmds/azure.py | 2 +- unstructured/ingest/cli/cmds/biomed.py | 2 +- unstructured/ingest/cli/cmds/box.py | 2 +- unstructured/ingest/cli/cmds/confluence.py | 2 +- unstructured/ingest/cli/cmds/delta_table.py | 2 +- unstructured/ingest/cli/cmds/discord.py | 2 +- unstructured/ingest/cli/cmds/dropbox.py | 2 +- unstructured/ingest/cli/cmds/elasticsearch.py | 2 +- unstructured/ingest/cli/cmds/fsspec.py | 2 +- unstructured/ingest/cli/cmds/gcs.py | 2 +- unstructured/ingest/cli/cmds/github.py | 2 +- unstructured/ingest/cli/cmds/gitlab.py | 2 +- unstructured/ingest/cli/cmds/google_drive.py | 2 +- unstructured/ingest/cli/cmds/jira.py | 2 +- unstructured/ingest/cli/cmds/local.py | 2 +- unstructured/ingest/cli/cmds/notion.py | 2 +- unstructured/ingest/cli/cmds/onedrive.py | 2 +- unstructured/ingest/cli/cmds/outlook.py | 2 +- unstructured/ingest/cli/cmds/reddit.py | 2 +- unstructured/ingest/cli/cmds/s3.py | 4 +- unstructured/ingest/cli/cmds/salesforce.py | 2 +- unstructured/ingest/cli/cmds/sharepoint.py | 2 +- unstructured/ingest/cli/cmds/slack.py | 2 +- unstructured/ingest/cli/cmds/wikipedia.py | 2 +- unstructured/ingest/cli/utils.py | 6 +- unstructured/ingest/connector/biomed.py | 4 +- unstructured/ingest/connector/fsspec.py | 8 +- unstructured/ingest/doc_processor/__init__.py | 0 .../ingest/doc_processor/generalized.py | 70 ----------------- unstructured/ingest/pipeline/__init__.py | 3 +- unstructured/ingest/pipeline/doc_factory.py | 2 +- unstructured/ingest/pipeline/interfaces.py | 24 +++--- unstructured/ingest/pipeline/pipeline.py | 8 +- .../ingest/pipeline/reformat/chunking.py | 3 +- .../ingest/pipeline/reformat/embedding.py | 3 +- .../ingest/pipeline/sample_pipeline.py | 75 ------------------- unstructured/ingest/processor.py | 3 +- 38 files changed, 61 insertions(+), 200 deletions(-) delete mode 100644 unstructured/ingest/doc_processor/__init__.py delete mode 100644 unstructured/ingest/doc_processor/generalized.py delete mode 100644 unstructured/ingest/pipeline/sample_pipeline.py diff --git a/unstructured/ingest/cli/cmds/airtable.py b/unstructured/ingest/cli/cmds/airtable.py index ed7a7a3e7a..b6f07d1cb7 100644 --- a/unstructured/ingest/cli/cmds/airtable.py +++ b/unstructured/ingest/cli/cmds/airtable.py @@ -82,7 +82,7 @@ def airtable_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[AirtableCliConfig]) runner = Airtable( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/azure.py b/unstructured/ingest/cli/cmds/azure.py index 37f5d96f48..fdffc5d5da 100644 --- a/unstructured/ingest/cli/cmds/azure.py +++ b/unstructured/ingest/cli/cmds/azure.py @@ -60,7 +60,7 @@ def azure_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[AzureCliConfig]) runner = Azure( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/biomed.py b/unstructured/ingest/cli/cmds/biomed.py index 6e1d50f49d..f72e5a81e5 100644 --- a/unstructured/ingest/cli/cmds/biomed.py +++ b/unstructured/ingest/cli/cmds/biomed.py @@ -81,7 +81,7 @@ def biomed_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[BiomedCliConfig]) runner = Biomed( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/box.py b/unstructured/ingest/cli/cmds/box.py index debd7f0afc..acf088827c 100644 --- a/unstructured/ingest/cli/cmds/box.py +++ b/unstructured/ingest/cli/cmds/box.py @@ -47,7 +47,7 @@ def box_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[BoxCliConfig]) runner = Box( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/confluence.py b/unstructured/ingest/cli/cmds/confluence.py index 0fa3082536..27b5b243c1 100644 --- a/unstructured/ingest/cli/cmds/confluence.py +++ b/unstructured/ingest/cli/cmds/confluence.py @@ -87,7 +87,7 @@ def confluence_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[ConfluenceCliConfig]) runner = Confluence( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index d67ce9f128..5338d8b6a2 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -67,7 +67,7 @@ def delta_table_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[DeltaTableCliConfig]) runner = DeltaTable( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/discord.py b/unstructured/ingest/cli/cmds/discord.py index 7a00fc33e2..e15474a419 100644 --- a/unstructured/ingest/cli/cmds/discord.py +++ b/unstructured/ingest/cli/cmds/discord.py @@ -61,7 +61,7 @@ def discord_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[DiscordCliConfig]) runner = Discord( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/dropbox.py b/unstructured/ingest/cli/cmds/dropbox.py index 1c41a4a8b6..768143843b 100644 --- a/unstructured/ingest/cli/cmds/dropbox.py +++ b/unstructured/ingest/cli/cmds/dropbox.py @@ -46,7 +46,7 @@ def dropbox_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[DropboxCliConfig]) runner = Dropbox( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/elasticsearch.py b/unstructured/ingest/cli/cmds/elasticsearch.py index a80920576b..111e2c0fc1 100644 --- a/unstructured/ingest/cli/cmds/elasticsearch.py +++ b/unstructured/ingest/cli/cmds/elasticsearch.py @@ -63,7 +63,7 @@ def elasticsearch_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[ElasticsearchCliConfig]) runner = ElasticSearch( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/fsspec.py b/unstructured/ingest/cli/cmds/fsspec.py index a47673ac51..6d5c09b93a 100644 --- a/unstructured/ingest/cli/cmds/fsspec.py +++ b/unstructured/ingest/cli/cmds/fsspec.py @@ -27,7 +27,7 @@ def fsspec_source(ctx: click.Context, **options): try: configs = extract_configs(options) runner = Fsspec( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/gcs.py b/unstructured/ingest/cli/cmds/gcs.py index 663d8ed085..d9c39f334f 100644 --- a/unstructured/ingest/cli/cmds/gcs.py +++ b/unstructured/ingest/cli/cmds/gcs.py @@ -49,7 +49,7 @@ def gcs_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([GcsCliConfig])) runner = GCS( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/github.py b/unstructured/ingest/cli/cmds/github.py index cbdb6818a9..1aac7fa9b1 100644 --- a/unstructured/ingest/cli/cmds/github.py +++ b/unstructured/ingest/cli/cmds/github.py @@ -72,7 +72,7 @@ def github_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([GithubCliConfig])) runner = Github( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/gitlab.py b/unstructured/ingest/cli/cmds/gitlab.py index a4564e6b05..c508f41472 100644 --- a/unstructured/ingest/cli/cmds/gitlab.py +++ b/unstructured/ingest/cli/cmds/gitlab.py @@ -72,7 +72,7 @@ def gitlab_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([GitlabCliConfig])) runner = Gitlab( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/google_drive.py b/unstructured/ingest/cli/cmds/google_drive.py index 9d3f478913..c6b119a0be 100644 --- a/unstructured/ingest/cli/cmds/google_drive.py +++ b/unstructured/ingest/cli/cmds/google_drive.py @@ -61,7 +61,7 @@ def google_drive_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([GoogleDriveCliConfig])) runner = GoogleDrive( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/jira.py b/unstructured/ingest/cli/cmds/jira.py index b5d8477909..0d2c014ea6 100644 --- a/unstructured/ingest/cli/cmds/jira.py +++ b/unstructured/ingest/cli/cmds/jira.py @@ -89,7 +89,7 @@ def jira_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([JiraCliConfig])) runner = Jira( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/local.py b/unstructured/ingest/cli/cmds/local.py index 03efbd0172..ce84c253f4 100644 --- a/unstructured/ingest/cli/cmds/local.py +++ b/unstructured/ingest/cli/cmds/local.py @@ -55,7 +55,7 @@ def local_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([LocalCliConfig])) runner = Local( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/notion.py b/unstructured/ingest/cli/cmds/notion.py index c35255fe28..10cd40dfee 100644 --- a/unstructured/ingest/cli/cmds/notion.py +++ b/unstructured/ingest/cli/cmds/notion.py @@ -62,7 +62,7 @@ def notion_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([NotionCliConfig])) runner = Notion( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/onedrive.py b/unstructured/ingest/cli/cmds/onedrive.py index 45fc50dc8f..b7d96dedc2 100644 --- a/unstructured/ingest/cli/cmds/onedrive.py +++ b/unstructured/ingest/cli/cmds/onedrive.py @@ -83,7 +83,7 @@ def onedrive_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([OnedriveCliConfig])) runner = OneDrive( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/outlook.py b/unstructured/ingest/cli/cmds/outlook.py index bdba5bb178..a08f8de5da 100644 --- a/unstructured/ingest/cli/cmds/outlook.py +++ b/unstructured/ingest/cli/cmds/outlook.py @@ -84,7 +84,7 @@ def outlook_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([OutlookCliConfig])) runner = Outlook( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/reddit.py b/unstructured/ingest/cli/cmds/reddit.py index 3e9edd5495..8936657cf5 100644 --- a/unstructured/ingest/cli/cmds/reddit.py +++ b/unstructured/ingest/cli/cmds/reddit.py @@ -84,7 +84,7 @@ def reddit_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([RedditCliConfig])) runner = Reddit( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/s3.py b/unstructured/ingest/cli/cmds/s3.py index 01be8dd06a..d8d3f6bdb2 100644 --- a/unstructured/ingest/cli/cmds/s3.py +++ b/unstructured/ingest/cli/cmds/s3.py @@ -59,7 +59,7 @@ def s3_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[S3CliConfig]) s3_runner = S3( - **configs, + **configs, # type: ignore ) s3_runner.run(**options) except Exception as e: @@ -79,7 +79,7 @@ def s3_dest(ctx: click.Context, **options): try: configs = extract_configs(options, validate=[S3CliConfig]) s3_runner = S3( - **configs, + **configs, # type: ignore writer_type="s3", writer_kwargs=options, ) diff --git a/unstructured/ingest/cli/cmds/salesforce.py b/unstructured/ingest/cli/cmds/salesforce.py index 6496b54a36..b24c6e97b7 100644 --- a/unstructured/ingest/cli/cmds/salesforce.py +++ b/unstructured/ingest/cli/cmds/salesforce.py @@ -73,7 +73,7 @@ def salesforce_source(ctx: click.Context, **options): try: configs = extract_configs(options, validate=([SalesforceCliConfig])) runner = Salesforce( - **configs, + **configs, # type: ignore ) runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/sharepoint.py b/unstructured/ingest/cli/cmds/sharepoint.py index 50b8465e78..90015d5094 100644 --- a/unstructured/ingest/cli/cmds/sharepoint.py +++ b/unstructured/ingest/cli/cmds/sharepoint.py @@ -82,7 +82,7 @@ def sharepoint_source(ctx: click.Context, **options): try: configs = extract_configs(data=options, validate=[SharepointCliConfig]) sharepoint_runner = SharePoint( - **configs, + **configs, # type: ignore ) sharepoint_runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/slack.py b/unstructured/ingest/cli/cmds/slack.py index 6a7101b3ac..3ea7a2ff35 100644 --- a/unstructured/ingest/cli/cmds/slack.py +++ b/unstructured/ingest/cli/cmds/slack.py @@ -72,7 +72,7 @@ def slack_source(ctx: click.Context, **options): try: configs = extract_configs(data=options, validate=[SlackCliConfig]) sharepoint_runner = Slack( - **configs, + **configs, # type: ignore ) sharepoint_runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/cmds/wikipedia.py b/unstructured/ingest/cli/cmds/wikipedia.py index 8ba9264fe9..231709313f 100644 --- a/unstructured/ingest/cli/cmds/wikipedia.py +++ b/unstructured/ingest/cli/cmds/wikipedia.py @@ -53,7 +53,7 @@ def wikipedia_source(ctx: click.Context, **options): try: configs = extract_configs(data=options, validate=[WikipediaCliConfig]) sharepoint_runner = Wikipedia( - **configs, + **configs, # type: ignore ) sharepoint_runner.run(**options) except Exception as e: diff --git a/unstructured/ingest/cli/utils.py b/unstructured/ingest/cli/utils.py index 93cf1bf600..f917bb02c6 100644 --- a/unstructured/ingest/cli/utils.py +++ b/unstructured/ingest/cli/utils.py @@ -23,7 +23,11 @@ def conform_click_options(options: dict): options[k] = list(v) -def extract_configs(data: dict, validate: t.List[t.Type[BaseConfig]]) -> t.Dict[str, BaseConfig]: +def extract_configs( + data: dict, + validate: t.Optional[t.List[t.Type[BaseConfig]]] = None, +) -> t.Dict[str, BaseConfig]: + validate = validate if validate else [] res = { "read_config": CliReadConfig.from_dict(data), "partition_config": CliPartitionConfig.from_dict(data), diff --git a/unstructured/ingest/connector/biomed.py b/unstructured/ingest/connector/biomed.py index 3262589469..d817c4893d 100644 --- a/unstructured/ingest/connector/biomed.py +++ b/unstructured/ingest/connector/biomed.py @@ -164,7 +164,7 @@ def urls_to_metadata(urls): download_filepath=(Path(self.read_config.download_dir) / local_path) .resolve() .as_posix(), - output_filepath=(Path(self.partition_config.output_dir) / local_path) + output_filepath=(Path(self.processor_config.output_dir) / local_path) .resolve() .as_posix(), ), @@ -246,7 +246,7 @@ def traverse(path, download_dir, output_dir): .resolve() .as_posix(), output_filepath=( - Path(self.partition_config.output_dir) / local_path + Path(self.processor_config.output_dir) / local_path ) .resolve() .as_posix(), diff --git a/unstructured/ingest/connector/fsspec.py b/unstructured/ingest/connector/fsspec.py index 73fde8fce2..9952551dec 100644 --- a/unstructured/ingest/connector/fsspec.py +++ b/unstructured/ingest/connector/fsspec.py @@ -3,7 +3,7 @@ import typing as t from contextlib import suppress from dataclasses import dataclass, field -from pathlib import Path +from pathlib import Path, PurePath from unstructured.ingest.error import SourceConnectionError from unstructured.ingest.interfaces import ( @@ -246,11 +246,7 @@ def write(self, docs: t.List[BaseIngestDoc]) -> None: for doc in docs: s3_file_path = doc.base_filename s3_folder = self.connector_config.path - if s3_folder[-1] != "/": - s3_folder = f"{s3_file_path}/" - if s3_file_path[0] == "/": - s3_file_path = s3_file_path[1:] - s3_output_path = s3_folder + s3_file_path + s3_output_path = str(PurePath(s3_folder, s3_file_path)) if s3_file_path else s3_folder logger.debug(f"Uploading {doc._output_filename} -> {s3_output_path}") fs.put_file(lpath=doc._output_filename, rpath=s3_output_path) diff --git a/unstructured/ingest/doc_processor/__init__.py b/unstructured/ingest/doc_processor/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/unstructured/ingest/doc_processor/generalized.py b/unstructured/ingest/doc_processor/generalized.py deleted file mode 100644 index f44b2fa8f4..0000000000 --- a/unstructured/ingest/doc_processor/generalized.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Process arbitrary files with the Unstructured library""" - -import os -from typing import Any, Dict, List, Optional - -from unstructured_inference.models.base import get_model - -from unstructured.ingest.connector.registry import create_ingest_doc_from_json -from unstructured.ingest.interfaces import ( - BaseSessionHandle, - IngestDocSessionHandleMixin, -) -from unstructured.ingest.logger import logger - -# module-level variable to store session handle -session_handle: Optional[BaseSessionHandle] = None - - -def initialize(): - """Download default model or model specified by UNSTRUCTURED_HI_RES_MODEL_NAME environment - variable (avoids subprocesses all doing the same)""" - - # If more than one model will be supported and left up to user selection - supported_model = os.environ.get("UNSTRUCTURED_HI_RES_SUPPORTED_MODEL", "") - if supported_model: - for model_name in supported_model.split(","): - get_model(model_name=model_name) - - get_model(os.environ.get("UNSTRUCTURED_HI_RES_MODEL_NAME")) - - -def process_document(ingest_doc_json: str, **partition_kwargs) -> Optional[List[Dict[str, Any]]]: - """Process the serialized json for any IngestDoc-like class of document with chosen - Unstructured partition logic. - - Parameters - ---------- - partition_kwargs - ultimately the parameters passed to partition() - """ - global session_handle - isd_elems_no_filename = None - doc = None - try: - doc = create_ingest_doc_from_json(ingest_doc_json) - if isinstance(doc, IngestDocSessionHandleMixin): - if session_handle is None: - # create via doc.session_handle, which is a property that creates a - # session handle if one is not already defined - session_handle = doc.session_handle - else: - doc.session_handle = session_handle - # does the work necessary to load file into filesystem - # in the future, get_file_handle() could also be supported - doc.get_file() - - isd_elems_no_filename = doc.process_file(**partition_kwargs) - - # Note, this may be a no-op if the IngestDoc doesn't do anything to persist - # the results. Instead, the Processor (caller) may work with the aggregate - # results across all docs in memory. - doc.write_result() - except Exception: - # TODO(crag) save the exception instead of print? - logger.error(f"Failed to process {doc}") - raise Exception - finally: - if doc: - doc.cleanup_file() - return isd_elems_no_filename diff --git a/unstructured/ingest/pipeline/__init__.py b/unstructured/ingest/pipeline/__init__.py index 103d7bbd10..19d78bdbcd 100644 --- a/unstructured/ingest/pipeline/__init__.py +++ b/unstructured/ingest/pipeline/__init__.py @@ -1,5 +1,5 @@ from .doc_factory import DocFactory -from .interfaces import PipelineContext +from .interfaces import PipelineContext, ReformatNode from .partition import Partitioner from .pipeline import Pipeline from .reformat.chunking import Chunker @@ -16,4 +16,5 @@ "Pipeline", "Writer", "Chunker", + "ReformatNode", ] diff --git a/unstructured/ingest/pipeline/doc_factory.py b/unstructured/ingest/pipeline/doc_factory.py index 6b72cbf9c3..e4a1598f8e 100644 --- a/unstructured/ingest/pipeline/doc_factory.py +++ b/unstructured/ingest/pipeline/doc_factory.py @@ -9,7 +9,7 @@ class DocFactory(DocFactoryNode): def initialize(self): self.source_doc_connector.initialize() - def run(self) -> t.Iterable[str]: + def run(self, *args, **kwargs) -> t.Iterable[str]: docs = self.source_doc_connector.get_ingest_docs() json_docs = [doc.to_json() for doc in docs] return json_docs diff --git a/unstructured/ingest/pipeline/interfaces.py b/unstructured/ingest/pipeline/interfaces.py index a48378ea00..2e11965b39 100644 --- a/unstructured/ingest/pipeline/interfaces.py +++ b/unstructured/ingest/pipeline/interfaces.py @@ -21,14 +21,20 @@ @dataclass class PipelineContext(ProcessorConfig): def __post_init__(self): - self.ingest_docs_map: mp.managers.DictProxy = None + self._ingest_docs_map: t.Optional[mp.managers.DictProxy] = None + + @property + def ingest_docs_map(self) -> mp.managers.DictProxy: + if not self._ingest_docs_map: + raise ValueError("ingest_docs_map never initialized") + return self._ingest_docs_map @dataclass class PipelineNode(DataClassJsonMixin, ABC): pipeline_context: PipelineContext - def __call__(self, iterable: t.Iterable[t.Any] = None): + def __call__(self, iterable: t.Optional[t.Iterable[t.Any]] = None) -> t.Any: iterable = iterable if iterable else [] self.initialize() if not self.supported_multiprocessing(): @@ -42,10 +48,6 @@ def __call__(self, iterable: t.Iterable[t.Any] = None): else: self.result = self.run() else: - logger.info( - f"processing {len(iterable)} items via " - f"{self.pipeline_context.num_processes} processes", - ) with mp.Pool( processes=self.pipeline_context.num_processes, initializer=ingest_log_streaming_init, @@ -58,7 +60,7 @@ def supported_multiprocessing(self) -> bool: return True @abstractmethod - def run(self): + def run(self, *args, **kwargs) -> t.Optional[t.Any]: pass def initialize(self): @@ -83,7 +85,7 @@ def initialize(self): self.source_doc_connector.initialize() @abstractmethod - def run(self) -> t.Iterable[str]: + def run(self, *args, **kwargs) -> t.Iterable[str]: pass def supported_multiprocessing(self) -> bool: @@ -132,12 +134,12 @@ def create_hash(self) -> str: def run(self, json_path: str) -> str: pass - def get_path(self) -> t.Optional[Path]: + def get_path(self) -> Path: return (Path(self.pipeline_context.work_dir) / "partitioned").resolve() @dataclass -class ReformatNode(PipelineNode): +class ReformatNode(PipelineNode, ABC): """ Encapsulated any logic to reformat the output List[Element] content from partition before writing it @@ -173,5 +175,5 @@ def initialize(self): super().initialize() @abstractmethod - def run(self, json_paths: t.List[str]): + def run(self, json_path: str): pass diff --git a/unstructured/ingest/pipeline/pipeline.py b/unstructured/ingest/pipeline/pipeline.py index 3cc0449686..9f649f2de2 100644 --- a/unstructured/ingest/pipeline/pipeline.py +++ b/unstructured/ingest/pipeline/pipeline.py @@ -23,7 +23,7 @@ class Pipeline(DataClassJsonMixin): pipeline_context: PipelineContext doc_factory_node: DocFactoryNode source_node: SourceNode - partition_node: t.Optional[PartitionNode] = None + partition_node: PartitionNode write_node: t.Optional[WriteNode] = None reformat_nodes: t.List[ReformatNode] = field(default_factory=list) @@ -42,8 +42,12 @@ def run(self): logger.info(f"running pipeline: {self.get_nodes_str()}") self.initialize() manager = mp.Manager() - self.pipeline_context.ingest_docs_map = manager.dict() + self.pipeline_context._ingest_docs_map = manager.dict() json_docs = self.doc_factory_node() + logger.info( + f"processing {len(json_docs)} docs via " + f"{self.pipeline_context.num_processes} processes", + ) for doc in json_docs: self.pipeline_context.ingest_docs_map[get_ingest_doc_hash(doc)] = doc self.source_node(iterable=json_docs) diff --git a/unstructured/ingest/pipeline/reformat/chunking.py b/unstructured/ingest/pipeline/reformat/chunking.py index cde55d66d0..99acbff230 100644 --- a/unstructured/ingest/pipeline/reformat/chunking.py +++ b/unstructured/ingest/pipeline/reformat/chunking.py @@ -1,7 +1,6 @@ import hashlib import json import os.path -import typing as t from dataclasses import dataclass from pathlib import Path @@ -50,5 +49,5 @@ def run(self, elements_json: str) -> str: json.dump(elements_dict, output_f, ensure_ascii=False, indent=2) return str(json_path) - def get_path(self) -> t.Optional[Path]: + def get_path(self) -> Path: return (Path(self.pipeline_context.work_dir) / "chunked").resolve() diff --git a/unstructured/ingest/pipeline/reformat/embedding.py b/unstructured/ingest/pipeline/reformat/embedding.py index bff47195d1..ae723b4797 100644 --- a/unstructured/ingest/pipeline/reformat/embedding.py +++ b/unstructured/ingest/pipeline/reformat/embedding.py @@ -1,7 +1,6 @@ import hashlib import json import os.path -import typing as t from dataclasses import dataclass from pathlib import Path @@ -48,5 +47,5 @@ def run(self, elements_json: str) -> str: json.dump(elements_dict, output_f, ensure_ascii=False, indent=2) return str(json_path) - def get_path(self) -> t.Optional[Path]: + def get_path(self) -> Path: return (Path(self.pipeline_context.work_dir) / "embedded").resolve() diff --git a/unstructured/ingest/pipeline/sample_pipeline.py b/unstructured/ingest/pipeline/sample_pipeline.py deleted file mode 100644 index ab05ad3981..0000000000 --- a/unstructured/ingest/pipeline/sample_pipeline.py +++ /dev/null @@ -1,75 +0,0 @@ -from unstructured.ingest.connector.s3 import ( - S3SourceConnector, - SimpleS3Config, -) -from unstructured.ingest.interfaces import ( - ChunkingConfig, - EmbeddingConfig, - PartitionConfig, - ReadConfig, -) -from unstructured.ingest.pipeline import ( - Chunker, - DocFactory, - Embedder, - Partitioner, - Pipeline, - PipelineContext, - Reader, - Writer, -) -from unstructured.ingest.runner.writers import s3_writer - -if __name__ == "__main__": - pipeline_config = PipelineContext(num_processes=1) - read_config = ReadConfig(preserve_downloads=True, download_dir="pipeline-test-output") - partition_config = PartitionConfig(strategy="fast") - page_title = "Open Source Software" - auto_suggest = False - - source_doc_connector = S3SourceConnector( # type: ignore - connector_config=SimpleS3Config( - path="s3://utic-dev-tech-fixtures/small-pdf-set/", - recursive=True, - access_kwargs={"anon": True}, - ), - read_config=read_config, - ) - doc_factory = DocFactory( - pipeline_context=pipeline_config, - source_doc_connector=source_doc_connector, - ) - reader = Reader(pipeline_context=pipeline_config) - partitioner = Partitioner(pipeline_context=pipeline_config, partition_config=partition_config) - embedder = Embedder( - pipeline_context=pipeline_config, - embedder_config=EmbeddingConfig( - api_key="FILL IN", - ), - reprocess=partition_config.reprocess, - ) - chunker = Chunker( - pipeline_context=pipeline_config, - chunking_config=ChunkingConfig( - chunk_elements=True, - new_after_n_chars=1499, - ), - reprocess=partition_config.reprocess, - ) - writer = Writer( - pipeline_context=pipeline_config, - dest_doc_connector=s3_writer( - remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/", - anonymous=True, - ), - ) - pipeline = Pipeline( - verbose=True, - pipeline_context=pipeline_config, - doc_factory_node=doc_factory, - source_node=reader, - partition_node=partitioner, - reformat_nodes=[chunker, embedder], - write_node=writer, - ) - pipeline.run() diff --git a/unstructured/ingest/processor.py b/unstructured/ingest/processor.py index c2a70d0d9b..8d9cd7c3e3 100644 --- a/unstructured/ingest/processor.py +++ b/unstructured/ingest/processor.py @@ -18,6 +18,7 @@ Pipeline, PipelineContext, Reader, + ReformatNode, Writer, ) @@ -40,7 +41,7 @@ def process_documents( ) reader = Reader(pipeline_context=pipeline_config) partitioner = Partitioner(pipeline_context=pipeline_config, partition_config=partition_config) - reformat_nodes = [] + reformat_nodes: t.List[ReformatNode] = [] if embedder_config: reformat_nodes.append( Embedder(