Skip to content

Commit

Permalink
Add fsspec specific write configs to s3 dest connector
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 17, 2023
1 parent b727a48 commit 533eb98
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 8 deletions.
41 changes: 40 additions & 1 deletion unstructured/ingest/cli/cmds/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,51 @@
from unstructured.ingest.cli.interfaces import (
CliFilesStorageConfig,
)
from unstructured.ingest.interfaces import (
FsspecConfig,
)
from unstructured.ingest.cli.interfaces import CliMixin
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.interfaces import FsspecConfig
from unstructured.ingest.connector.fsspec import FsspecWriteConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import FsspecRunner


class FsspecCliWriteConfigs(FsspecWriteConfig, CliMixin):
"""
filename: t.Optional[str] = None
indent: int = 4
encoding: str = "utf-8"
"""

@staticmethod
def add_cli_options(cmd: click.Command) -> None:
options = [
click.Option(
["--filename"],
default=None,
type=str,
help="When uploading a single file to s3, what the filename should be. "
"Can be omitted if the remote path set contains the filename",
),
click.Option(
["--indent"],
type=int,
default=4,
help="What indent to use if the content needs to be converted to json. "
"Only applies when writing a list of elements.",
),
click.Option(
["--encoding"],
type=str,
default="utf-8",
help="what encoding to use when writing the contents to a file. "
"Only applies when writing a list of elements.",
),
]
cmd.params.extend(options)


@click.group(name="fsspec", invoke_without_command=True, cls=Group)
@click.pass_context
def fsspec_source(ctx: click.Context, **options):
Expand Down
5 changes: 5 additions & 0 deletions unstructured/ingest/cli/cmds/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import click

from unstructured.ingest.cli.cmds.fsspec import FsspecCliWriteConfigs
from unstructured.ingest.cli.common import (
log_options,
)
Expand Down Expand Up @@ -85,6 +86,9 @@ def s3_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(options, validate=[S3CliConfig])
# Validate
FsspecCliWriteConfigs.from_dict(configs)
runner_cls = runner_map[source_cmd]
configs = extract_configs(
options,
Expand All @@ -110,6 +114,7 @@ def get_dest_cmd() -> click.Command:
cmd = s3_dest
S3CliConfig.add_cli_options(cmd)
CliFilesStorageConfig.add_cli_options(cmd)
FsspecCliWriteConfigs.add_cli_options(cmd)
return cmd


Expand Down
27 changes: 22 additions & 5 deletions unstructured/ingest/connector/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
IngestDocCleanupMixin,
SourceConnectorCleanupMixin,
SourceMetadata,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.staging.base import convert_to_dict
Expand Down Expand Up @@ -224,9 +225,17 @@ def get_ingest_docs(self):
return docs


@dataclass
class FsspecWriteConfig(WriteConfig):
filename: t.Optional[str] = None
indent: int = 4
encoding: str = "utf-8"


@dataclass
class FsspecDestinationConnector(BaseDestinationConnector):
connector_config: SimpleFsspecConfig
write_config: FsspecWriteConfig

def initialize(self):
from fsspec import AbstractFileSystem, get_filesystem_class
Expand All @@ -243,13 +252,21 @@ def write_elements(self, elements: t.List[Element]) -> None:
)

logger.info(f"Writing content using filesystem: {type(fs).__name__}")
s3_folder = self.connector_config.path

s3_path = self.connector_config.path
s3_output_path = (
str(PurePath(s3_folder, self.write_config.filename))
if self.write_config.filename
else s3_folder
)
element_dict = convert_to_dict(elements)
with tempfile.NamedTemporaryFile(mode="w+") as tmp_file:
json.dump(element_dict, tmp_file)
logger.debug(f"Uploading {tmp_file.name} -> {s3_path}")
fs.put_file(lpath=tmp_file.name, rpath=s3_path)
with tempfile.NamedTemporaryFile(
mode="w+",
encoding=self.write_config.encoding,
) as tmp_file:
json.dump(element_dict, tmp_file, indent=self.write_config.indent)
logger.debug(f"Uploading {tmp_file.name} -> {s3_output_path}")
fs.put_file(lpath=tmp_file.name, rpath=s3_output_path)

def write(self, docs: t.List[BaseIngestDoc]) -> None:
from fsspec import AbstractFileSystem, get_filesystem_class
Expand Down
7 changes: 7 additions & 0 deletions unstructured/ingest/connector/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
FsspecDestinationConnector,
FsspecIngestDoc,
FsspecSourceConnector,
FsspecWriteConfig,
SimpleFsspecConfig,
)
from unstructured.utils import requires_dependencies
Expand Down Expand Up @@ -34,6 +35,12 @@ def __post_init__(self):
self.ingest_doc_cls: Type[S3IngestDoc] = S3IngestDoc


@dataclass
class S3WriteConfig(FsspecWriteConfig):
pass


@dataclass
class S3DestinationConnector(FsspecDestinationConnector):
connector_config: SimpleS3Config
write_config: S3WriteConfig
11 changes: 9 additions & 2 deletions unstructured/ingest/runner/writers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing as t
from pathlib import Path

from unstructured.ingest.interfaces import WriteConfig
from unstructured.ingest.connector.s3 import S3WriteConfig
from unstructured.utils import requires_dependencies


Expand All @@ -10,6 +10,9 @@ def s3_writer(
remote_url: str,
anonymous: bool,
endpoint_url: t.Optional[str] = None,
filename: t.Optional[str] = None,
indent: int = 4,
encoding: str = "utf-8",
verbose: bool = False,
**kwargs,
):
Expand All @@ -23,7 +26,11 @@ def s3_writer(
access_kwargs["endpoint_url"] = endpoint_url

return S3DestinationConnector(
write_config=WriteConfig(),
write_config=S3WriteConfig(
filename=filename,
encoding=encoding,
indent=indent,
),
connector_config=SimpleS3Config(
remote_url=remote_url,
access_kwargs=access_kwargs,
Expand Down

0 comments on commit 533eb98

Please sign in to comment.