diff --git a/CHANGELOG.md b/CHANGELOG.md index 8487f4490e..2808f5657c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.25-dev5 +## 0.10.25-dev6 ### Enhancements @@ -15,6 +15,7 @@ * **Fix language detection of elements with empty strings** This resolves a warning message that was raised by `langdetect` if the language was attempted to be detected on an empty string. Language detection is now skipped for empty strings. * **Fix chunks breaking on regex-metadata matches.** Fixes "over-chunking" when `regex_metadata` was used, where every element that contained a regex-match would start a new chunk. * **Fix regex-metadata match offsets not adjusted within chunk.** Fixes incorrect regex-metadata match start/stop offset in chunks where multiple elements are combined. +* **Map source cli command configs when destination set** Due to how the source connector is dynamically called when the destination connector is set via the CLI, the configs were being set incorrectoy, causing the source connector to break. The configs were fixed and updated to take into account Fsspec-specific connectors. ## 0.10.24 diff --git a/unstructured/__version__.py b/unstructured/__version__.py index f6a6b1837a..e93cf8b261 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.25-dev5" # pragma: no cover +__version__ = "0.10.25-dev6" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py index b7ccf4f1df..ac11da932b 100644 --- a/unstructured/ingest/cli/cmds/azure_cognitive_search.py +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -10,10 +10,11 @@ from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import conform_click_options, extract_configs +from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import runner_map + +pass @dataclass @@ -68,15 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[AzureCognitiveSearchCliWriteConfig]) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="azure_cognitive_search", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[AzureCognitiveSearchCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index 1ee17be364..bb1beb2dbd 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -10,10 +10,16 @@ from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import DeltaTableRunner, runner_map +from unstructured.ingest.runner import DeltaTableRunner @dataclass @@ -118,17 +124,12 @@ def delta_table_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(parent_options, validate=[DeltaTableCliConfig]) - # Validate write configs - DeltaTableCliWriteConfig.from_dict(options) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="delta_table", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[DeltaTableCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/s3.py b/unstructured/ingest/cli/cmds/s3.py index 2dc17cdf91..d4d701c051 100644 --- a/unstructured/ingest/cli/cmds/s3.py +++ b/unstructured/ingest/cli/cmds/s3.py @@ -11,10 +11,16 @@ CliFilesStorageConfig, CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map +from unstructured.ingest.runner import S3Runner @dataclass @@ -85,22 +91,12 @@ def s3_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[S3CliConfig]) - runner_cls = runner_map[source_cmd] - configs = extract_configs( - options, - validate=[S3CliConfig], - extras={"fsspec_config": FsspecConfig} - if issubclass(runner_cls, FsspecBaseRunner) - else None, - ) - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="s3", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[S3CliConfig, CliFilesStorageConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/utils.py b/unstructured/ingest/cli/utils.py index 6a03a474b9..faebdb5228 100644 --- a/unstructured/ingest/cli/utils.py +++ b/unstructured/ingest/cli/utils.py @@ -13,9 +13,35 @@ CliReadConfig, CliRetryStrategyConfig, ) -from unstructured.ingest.interfaces import ( - BaseConfig, -) +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig +from unstructured.ingest.runner import FsspecBaseRunner, runner_map + + +def orchestrate_runner( + source_cmd: str, + writer_type: str, + parent_options: dict, + options: dict, + validate: t.Optional[t.List[t.Type[BaseConfig]]] = None, +): + runner_cls = runner_map[source_cmd] + configs = extract_configs( + parent_options, + extras={"fsspec_config": FsspecConfig} + if issubclass(runner_cls, FsspecBaseRunner) + else None, + ) + for val in validate: + val.from_dict(options) + runner_cls = runner_map[source_cmd] + runner = runner_cls( + **configs, # type: ignore + writer_type=writer_type, + writer_kwargs=options, + ) + runner.run( + **parent_options, + ) def conform_click_options(options: dict): diff --git a/unstructured/ingest/connector/fsspec.py b/unstructured/ingest/connector/fsspec.py index 50faa57c17..1aaf2aeac5 100644 --- a/unstructured/ingest/connector/fsspec.py +++ b/unstructured/ingest/connector/fsspec.py @@ -249,16 +249,20 @@ def write_dict( logger.info(f"Writing content using filesystem: {type(fs).__name__}") - s3_folder = self.connector_config.path_without_protocol - s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder - full_s3_path = f"s3://{s3_output_path}" - logger.debug(f"uploading content to {full_s3_path}") - fs.write_text(full_s3_path, json.dumps(json_list, indent=indent), encoding=encoding) + output_folder = self.connector_config.path_without_protocol + output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator + filename = ( + filename.strip(os.sep) if filename else filename + ) # Make sure filename doesn't begin with file seperator + output_path = str(PurePath(output_folder, filename)) if filename else output_folder + full_output_path = f"s3://{output_path}" + logger.debug(f"uploading content to {full_output_path}") + fs.write_text(full_output_path, json.dumps(json_list, indent=indent), encoding=encoding) def write(self, docs: t.List[BaseIngestDoc]) -> None: for doc in docs: - s3_file_path = doc.base_filename - filename = s3_file_path if s3_file_path else None + file_path = doc.base_filename + filename = file_path if file_path else None with open(doc._output_filename) as json_file: logger.debug(f"uploading content from {doc._output_filename}") json_list = json.load(json_file)