Skip to content

Commit

Permalink
Extract logic to run the runner
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 20, 2023
1 parent a98e845 commit 51119e6
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 58 deletions.
26 changes: 9 additions & 17 deletions unstructured/ingest/cli/cmds/azure_cognitive_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import conform_click_options, extract_configs
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import FsspecBaseRunner, runner_map

pass


@dataclass
Expand Down Expand Up @@ -68,21 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="azure_cognitive_search",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[AzureCognitiveSearchCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
33 changes: 14 additions & 19 deletions unstructured/ingest/cli/cmds/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import DeltaTableRunner, FsspecBaseRunner, runner_map
from unstructured.ingest.runner import DeltaTableRunner


@dataclass
Expand Down Expand Up @@ -118,23 +124,12 @@ def delta_table_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
# Validate write configs
DeltaTableCliWriteConfig.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="delta_table",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[DeltaTableCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
32 changes: 13 additions & 19 deletions unstructured/ingest/cli/cmds/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
CliFilesStorageConfig,
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map
from unstructured.ingest.runner import S3Runner


@dataclass
Expand Down Expand Up @@ -85,24 +91,12 @@ def s3_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
validate=[S3CliConfig],
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
# validate dest cli
S3CliConfig.from_dict(options)
CliFilesStorageConfig.from_dict(options)
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="s3",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[S3CliConfig, CliFilesStorageConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
32 changes: 29 additions & 3 deletions unstructured/ingest/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,35 @@
CliReadConfig,
CliRetryStrategyConfig,
)
from unstructured.ingest.interfaces import (
BaseConfig,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.runner import FsspecBaseRunner, runner_map


def orchestrate_runner(
source_cmd: str,
writer_type: str,
parent_options: dict,
options: dict,
validate: t.Optional[t.List[t.Type[BaseConfig]]] = None,
):
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
for val in validate:
val.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
writer_type=writer_type,
writer_kwargs=options,
)
runner.run(
**parent_options,
)


def conform_click_options(options: dict):
Expand Down

0 comments on commit 51119e6

Please sign in to comment.