Skip to content

Commit

Permalink
Create shared pattern across writing approaches
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 17, 2023
1 parent 8994845 commit 11ccecf
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 46 deletions.
8 changes: 1 addition & 7 deletions unstructured/ingest/connector/azure_cognitive_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import azure.core.exceptions

from unstructured.documents.elements import Element
from unstructured.ingest.error import WriteError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
Expand All @@ -14,7 +13,6 @@
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.staging.base import convert_to_dict
from unstructured.utils import requires_dependencies


Expand Down Expand Up @@ -84,11 +82,7 @@ def conform_dict(self, data: dict) -> None:
if page_number := data.get("metadata", {}).get("page_number"):
data["metadata"]["page_number"] = str(page_number)

def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
elements_dict = convert_to_dict(elements)
self.write_dict(json_list=elements_dict)

def write_dict(self, json_list: t.List[t.Dict[str, t.Any]]) -> None:
def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"writing {len(json_list)} documents to destination "
f"index at {self.write_config.index}",
Expand Down
20 changes: 8 additions & 12 deletions unstructured/ingest/connector/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import pandas as pd

from unstructured.documents.elements import Element
from unstructured.ingest.error import SourceConnectionError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
Expand Down Expand Up @@ -167,15 +166,13 @@ class DeltaTableDestinationConnector(BaseDestinationConnector):
def initialize(self):
pass

def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
elements_json = [json.dumps(e.to_dict()) for e in elements]
self.write_dict(json_list=elements_json)

def write_dict(self, json_list: t.List[str]) -> None:
def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
# Need json list as strings
json_list_s = [json.dumps(e) for e in json_list]
from deltalake.writer import write_deltalake

logger.info(
f"writing {len(json_list)} rows to destination "
f"writing {len(json_list_s)} rows to destination "
f"table at {self.connector_config.table_uri}",
)
# NOTE: deltalake writer on Linux sometimes can finish but still trigger a SIGABRT and cause
Expand All @@ -186,7 +183,7 @@ def write_dict(self, json_list: t.List[str]) -> None:
target=write_deltalake,
kwargs={
"table_or_uri": self.connector_config.table_uri,
"data": pd.DataFrame(data={self.write_config.write_column: json_list}),
"data": pd.DataFrame(data={self.write_config.write_column: json_list_s}),
"mode": self.write_config.mode,
},
)
Expand All @@ -195,12 +192,11 @@ def write_dict(self, json_list: t.List[str]) -> None:

@requires_dependencies(["deltalake"], extras="delta-table")
def write(self, docs: t.List[BaseIngestDoc]) -> None:
json_list: t.List[str] = []
json_list: t.List[t.Dict[str, t.Any]] = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
json_content = json.load(json_file)
json_items = [json.dumps(j) for j in json_content]
logger.info(f"converting {len(json_items)} rows from content in {local_path}")
json_list.extend(json_items)
logger.info(f"converting {len(json_content)} rows from content in {local_path}")
json_list.extend(json_content)
self.write_dict(json_list=json_list)
38 changes: 12 additions & 26 deletions unstructured/ingest/connector/fsspec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import json
import os
import tempfile
import typing as t
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path, PurePath

from unstructured.documents.elements import Element
from unstructured.ingest.compression_support import (
TAR_FILE_EXT,
ZIP_FILE_EXT,
Expand All @@ -24,7 +22,6 @@
SourceMetadata,
)
from unstructured.ingest.logger import logger
from unstructured.staging.base import convert_to_dict
from unstructured.utils import (
requires_dependencies,
)
Expand Down Expand Up @@ -235,13 +232,13 @@ def initialize(self):
**self.connector_config.get_access_kwargs(),
)

def write_elements(
def write_dict(
self,
elements: t.List[Element],
*args,
json_list: t.List[t.Dict[str, t.Any]],
filename: t.Optional[str] = None,
indent: int = 4,
encoding: str = "utf-8",
*args,
**kwargs,
) -> None:
from fsspec import AbstractFileSystem, get_filesystem_class
Expand All @@ -252,28 +249,17 @@ def write_elements(

logger.info(f"Writing content using filesystem: {type(fs).__name__}")

s3_folder = self.connector_config.path

s3_folder = self.connector_config.path_without_protocol
s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder
element_dict = convert_to_dict(elements)
with tempfile.NamedTemporaryFile(mode="w+", encoding=encoding) as tmp_file:
json.dump(element_dict, tmp_file, indent=indent)
logger.debug(f"Uploading {tmp_file.name} -> {s3_output_path}")
fs.put_file(lpath=tmp_file.name, rpath=s3_output_path)
full_s3_path = f"s3://{s3_output_path}"
logger.debug(f"uploading content to {full_s3_path}")
fs.write_text(full_s3_path, json.dumps(json_list, indent=indent), encoding=encoding)

def write(self, docs: t.List[BaseIngestDoc]) -> None:
from fsspec import AbstractFileSystem, get_filesystem_class

fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)(
**self.connector_config.get_access_kwargs(),
)

logger.info(f"Writing content using filesystem: {type(fs).__name__}")

for doc in docs:
s3_file_path = doc.base_filename
s3_folder = self.connector_config.remote_url

s3_output_path = str(PurePath(s3_folder, s3_file_path)) if s3_file_path else s3_folder
logger.debug(f"Uploading {doc._output_filename} -> {s3_output_path}")
fs.put_file(lpath=doc._output_filename, rpath=s3_output_path)
filename = s3_file_path if s3_file_path else None
with open(doc._output_filename) as json_file:
logger.debug(f"uploading content from {doc._output_filename}")
json_list = json.load(json_file)
self.write_dict(json_list=json_list, filename=filename)
6 changes: 5 additions & 1 deletion unstructured/ingest/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,13 @@ def write(self, docs: t.List[BaseIngestDoc]) -> None:
pass

@abstractmethod
def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
pass

def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None:
elements_json = [e.to_dict() for e in elements]
self.write_dict(*args, json_list=elements_json, **kwargs)


class SourceConnectorCleanupMixin:
read_config: ReadConfig
Expand Down

0 comments on commit 11ccecf

Please sign in to comment.