Skip to content

Commit

Permalink
restructure write params for fsspec method
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 17, 2023
1 parent 1adff16 commit 8994845
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
2 changes: 1 addition & 1 deletion unstructured/ingest/connector/azure_cognitive_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def conform_dict(self, data: dict) -> None:
if page_number := data.get("metadata", {}).get("page_number"):
data["metadata"]["page_number"] = str(page_number)

def write_elements(self, elements: t.List[Element]) -> None:
def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
elements_dict = convert_to_dict(elements)
self.write_dict(json_list=elements_dict)

Expand Down
2 changes: 1 addition & 1 deletion unstructured/ingest/connector/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class DeltaTableDestinationConnector(BaseDestinationConnector):
def initialize(self):
pass

def write_elements(self, elements: t.List[Element]) -> None:
def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
elements_json = [json.dumps(e.to_dict()) for e in elements]
self.write_dict(json_list=elements_json)

Expand Down
22 changes: 16 additions & 6 deletions unstructured/ingest/connector/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,15 @@ def initialize(self):
**self.connector_config.get_access_kwargs(),
)

def write_elements(self, elements: t.List[Element]) -> None:
def write_elements(
self,
elements: t.List[Element],
filename: t.Optional[str] = None,
indent: int = 4,
encoding: str = "utf-8",
*args,
**kwargs,
) -> None:
from fsspec import AbstractFileSystem, get_filesystem_class

fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)(
Expand All @@ -244,12 +252,14 @@ def write_elements(self, elements: t.List[Element]) -> None:

logger.info(f"Writing content using filesystem: {type(fs).__name__}")

s3_path = self.connector_config.path
s3_folder = self.connector_config.path

s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder
element_dict = convert_to_dict(elements)
with tempfile.NamedTemporaryFile(mode="w+") as tmp_file:
json.dump(element_dict, tmp_file)
logger.debug(f"Uploading {tmp_file.name} -> {s3_path}")
fs.put_file(lpath=tmp_file.name, rpath=s3_path)
with tempfile.NamedTemporaryFile(mode="w+", encoding=encoding) as tmp_file:
json.dump(element_dict, tmp_file, indent=indent)
logger.debug(f"Uploading {tmp_file.name} -> {s3_output_path}")
fs.put_file(lpath=tmp_file.name, rpath=s3_output_path)

def write(self, docs: t.List[BaseIngestDoc]) -> None:
from fsspec import AbstractFileSystem, get_filesystem_class
Expand Down
2 changes: 1 addition & 1 deletion unstructured/ingest/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def write(self, docs: t.List[BaseIngestDoc]) -> None:
pass

@abstractmethod
def write_elements(self, elements: t.List[Element]) -> None:
def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
pass


Expand Down

0 comments on commit 8994845

Please sign in to comment.