From 51119e6492312c974e1aac7672c001b872866524 Mon Sep 17 00:00:00 2001 From: Roman Isecke Date: Wed, 18 Oct 2023 16:24:00 -0400 Subject: [PATCH] Extract logic to run the runner --- .../ingest/cli/cmds/azure_cognitive_search.py | 26 +++++---------- unstructured/ingest/cli/cmds/delta_table.py | 33 ++++++++----------- unstructured/ingest/cli/cmds/s3.py | 32 ++++++++---------- unstructured/ingest/cli/utils.py | 32 ++++++++++++++++-- 4 files changed, 65 insertions(+), 58 deletions(-) diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py index 35c55766e6..ac11da932b 100644 --- a/unstructured/ingest/cli/cmds/azure_cognitive_search.py +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -10,10 +10,11 @@ from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig, FsspecConfig +from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner +from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import FsspecBaseRunner, runner_map + +pass @dataclass @@ -68,21 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - runner_cls = runner_map[source_cmd] - configs = extract_configs( - parent_options, - extras={"fsspec_config": FsspecConfig} - if issubclass(runner_cls, FsspecBaseRunner) - else None, - ) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="azure_cognitive_search", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[AzureCognitiveSearchCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index d944f226f2..bb1beb2dbd 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -10,10 +10,16 @@ from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig, FsspecConfig +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) +from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import DeltaTableRunner, FsspecBaseRunner, runner_map +from unstructured.ingest.runner import DeltaTableRunner @dataclass @@ -118,23 +124,12 @@ def delta_table_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - runner_cls = runner_map[source_cmd] - configs = extract_configs( - parent_options, - extras={"fsspec_config": FsspecConfig} - if issubclass(runner_cls, FsspecBaseRunner) - else None, - ) - # Validate write configs - DeltaTableCliWriteConfig.from_dict(options) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="delta_table", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[DeltaTableCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/s3.py b/unstructured/ingest/cli/cmds/s3.py index 75ea7630f1..d4d701c051 100644 --- a/unstructured/ingest/cli/cmds/s3.py +++ b/unstructured/ingest/cli/cmds/s3.py @@ -11,10 +11,16 @@ CliFilesStorageConfig, CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map +from unstructured.ingest.runner import S3Runner @dataclass @@ -85,24 +91,12 @@ def s3_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - runner_cls = runner_map[source_cmd] - configs = extract_configs( - parent_options, - validate=[S3CliConfig], - extras={"fsspec_config": FsspecConfig} - if issubclass(runner_cls, FsspecBaseRunner) - else None, - ) - # validate dest cli - S3CliConfig.from_dict(options) - CliFilesStorageConfig.from_dict(options) - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="s3", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[S3CliConfig, CliFilesStorageConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/utils.py b/unstructured/ingest/cli/utils.py index 6a03a474b9..faebdb5228 100644 --- a/unstructured/ingest/cli/utils.py +++ b/unstructured/ingest/cli/utils.py @@ -13,9 +13,35 @@ CliReadConfig, CliRetryStrategyConfig, ) -from unstructured.ingest.interfaces import ( - BaseConfig, -) +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig +from unstructured.ingest.runner import FsspecBaseRunner, runner_map + + +def orchestrate_runner( + source_cmd: str, + writer_type: str, + parent_options: dict, + options: dict, + validate: t.Optional[t.List[t.Type[BaseConfig]]] = None, +): + runner_cls = runner_map[source_cmd] + configs = extract_configs( + parent_options, + extras={"fsspec_config": FsspecConfig} + if issubclass(runner_cls, FsspecBaseRunner) + else None, + ) + for val in validate: + val.from_dict(options) + runner_cls = runner_map[source_cmd] + runner = runner_cls( + **configs, # type: ignore + writer_type=writer_type, + writer_kwargs=options, + ) + runner.run( + **parent_options, + ) def conform_click_options(options: dict):