Skip to content

Commit

Permalink
bugfix/mapping source connectors in destination cli commands (#1788)
Browse files Browse the repository at this point in the history
### Description
Due to the dynamic nature of how the source connector is called when a
destination command is invoked, the configs need to be mapped and the
fsspec config needs to be dynamically added based on the type of runner
being used. This code was added to all currently supported destination
commands.
  • Loading branch information
rbiseck3 authored Oct 20, 2023
1 parent 1b90028 commit 15b6969
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 51 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.10.25-dev5
## 0.10.25-dev6

### Enhancements

Expand All @@ -15,6 +15,7 @@
* **Fix language detection of elements with empty strings** This resolves a warning message that was raised by `langdetect` if the language was attempted to be detected on an empty string. Language detection is now skipped for empty strings.
* **Fix chunks breaking on regex-metadata matches.** Fixes "over-chunking" when `regex_metadata` was used, where every element that contained a regex-match would start a new chunk.
* **Fix regex-metadata match offsets not adjusted within chunk.** Fixes incorrect regex-metadata match start/stop offset in chunks where multiple elements are combined.
* **Map source cli command configs when destination set** Due to how the source connector is dynamically called when the destination connector is set via the CLI, the configs were being set incorrectoy, causing the source connector to break. The configs were fixed and updated to take into account Fsspec-specific connectors.

## 0.10.24

Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.10.25-dev5" # pragma: no cover
__version__ = "0.10.25-dev6" # pragma: no cover
18 changes: 8 additions & 10 deletions unstructured/ingest/cli/cmds/azure_cognitive_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import conform_click_options, extract_configs
from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import runner_map

pass


@dataclass
Expand Down Expand Up @@ -68,15 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(options, validate=[AzureCognitiveSearchCliWriteConfig])
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="azure_cognitive_search",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[AzureCognitiveSearchCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
25 changes: 13 additions & 12 deletions unstructured/ingest/cli/cmds/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import DeltaTableRunner, runner_map
from unstructured.ingest.runner import DeltaTableRunner


@dataclass
Expand Down Expand Up @@ -118,17 +124,12 @@ def delta_table_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(parent_options, validate=[DeltaTableCliConfig])
# Validate write configs
DeltaTableCliWriteConfig.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="delta_table",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[DeltaTableCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
30 changes: 13 additions & 17 deletions unstructured/ingest/cli/cmds/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
CliFilesStorageConfig,
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map
from unstructured.ingest.runner import S3Runner


@dataclass
Expand Down Expand Up @@ -85,22 +91,12 @@ def s3_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(options, validate=[S3CliConfig])
runner_cls = runner_map[source_cmd]
configs = extract_configs(
options,
validate=[S3CliConfig],
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="s3",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[S3CliConfig, CliFilesStorageConfig],
)
except Exception as e:
logger.error(e, exc_info=True)
Expand Down
32 changes: 29 additions & 3 deletions unstructured/ingest/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,35 @@
CliReadConfig,
CliRetryStrategyConfig,
)
from unstructured.ingest.interfaces import (
BaseConfig,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.runner import FsspecBaseRunner, runner_map


def orchestrate_runner(
source_cmd: str,
writer_type: str,
parent_options: dict,
options: dict,
validate: t.Optional[t.List[t.Type[BaseConfig]]] = None,
):
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
for val in validate:
val.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
writer_type=writer_type,
writer_kwargs=options,
)
runner.run(
**parent_options,
)


def conform_click_options(options: dict):
Expand Down
18 changes: 11 additions & 7 deletions unstructured/ingest/connector/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,20 @@ def write_dict(

logger.info(f"Writing content using filesystem: {type(fs).__name__}")

s3_folder = self.connector_config.path_without_protocol
s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder
full_s3_path = f"s3://{s3_output_path}"
logger.debug(f"uploading content to {full_s3_path}")
fs.write_text(full_s3_path, json.dumps(json_list, indent=indent), encoding=encoding)
output_folder = self.connector_config.path_without_protocol
output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator
filename = (
filename.strip(os.sep) if filename else filename
) # Make sure filename doesn't begin with file seperator
output_path = str(PurePath(output_folder, filename)) if filename else output_folder
full_output_path = f"s3://{output_path}"
logger.debug(f"uploading content to {full_output_path}")
fs.write_text(full_output_path, json.dumps(json_list, indent=indent), encoding=encoding)

def write(self, docs: t.List[BaseIngestDoc]) -> None:
for doc in docs:
s3_file_path = doc.base_filename
filename = s3_file_path if s3_file_path else None
file_path = doc.base_filename
filename = file_path if file_path else None
with open(doc._output_filename) as json_file:
logger.debug(f"uploading content from {doc._output_filename}")
json_list = json.load(json_file)
Expand Down

0 comments on commit 15b6969

Please sign in to comment.