Skip to content

Commit

Permalink
Generalize approach to single function, re use elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Jan 14, 2025
1 parent e3b4af8 commit 32eb53e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 46 deletions.
24 changes: 1 addition & 23 deletions test/integration/connectors/utils/validation/destination.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import json
import os
import shutil
from pathlib import Path

import ndjson

from test.integration.connectors.utils.validation.utils import ValidationConfig
from unstructured_ingest.utils.data_prep import get_data
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers, UploadStager
from unstructured_ingest.v2.logger import logger


class StagerValidationConfigs(ValidationConfig):
Expand Down Expand Up @@ -47,25 +44,6 @@ def update_stager_fixtures(stager_output_path: Path, staged_filepath: Path):
shutil.copy(staged_filepath, copied_filepath)


def get_data(staged_filepath: Path) -> list[dict]:
if staged_filepath.suffix == ".json":
with staged_filepath.open() as f:
return json.load(f)
elif staged_filepath.suffix == ".ndjson":
with staged_filepath.open() as f:
return ndjson.load(f)
else:
# Attempt to read it as json
try:
with staged_filepath.open() as f:
logger.warning(
f"File extension mismatch, attempting to read as json: {staged_filepath}"
)
return json.load(f)
except Exception:
raise ValueError(f"Failed to read file: {staged_filepath}")


def stager_validation(
stager: UploadStager,
tmp_dir: Path,
Expand Down
33 changes: 32 additions & 1 deletion unstructured_ingest/utils/data_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import ndjson
import pandas as pd

from unstructured_ingest.v2.logger import logger

DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d+%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z")

T = TypeVar("T")
Expand Down Expand Up @@ -135,7 +137,7 @@ def validate_date_args(date: Optional[str] = None) -> bool:
)


def get_data(path: Path) -> list[dict]:
def get_data_by_suffix(path: Path) -> list[dict]:
with path.open() as f:
if path.suffix == ".json":
return json.load(f)
Expand All @@ -151,6 +153,35 @@ def get_data(path: Path) -> list[dict]:
raise ValueError(f"Unsupported file type: {path}")


def get_data(path: Path) -> list[dict]:
try:
return get_data_by_suffix(path=path)
except Exception as e:
logger.warning(f"failed to read {path} by extension: {e}")
# Fall back
with path.open() as f:
try:
return json.load(f)
except Exception as e:
logger.warning(f"failed to read {path} as json: {e}")
try:
return ndjson.load(f)
except Exception as e:
logger.warning(f"failed to read {path} as ndjson: {e}")
try:
df = pd.read_csv(path)
return df.to_dict(orient="records")
except Exception as e:
logger.warning(f"failed to read {path} as csv: {e}")
try:
df = pd.read_parquet(path)
return df.to_dict(orient="records")
except Exception as e:
logger.warning(f"failed to read {path} as parquet: {e}")

raise IOError(f"File could not be parsed: {path}")


def get_data_df(path: Path) -> pd.DataFrame:
with path.open() as f:
if path.suffix == ".json":
Expand Down
19 changes: 0 additions & 19 deletions unstructured_ingest/v2/interfaces/upload_stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from unstructured_ingest.v2.interfaces.file_data import FileData
from unstructured_ingest.v2.interfaces.process import BaseProcess
from unstructured_ingest.v2.logger import logger


class UploadStagerConfig(BaseModel):
Expand All @@ -33,24 +32,6 @@ def write_output(self, output_path: Path, data: list[dict]) -> None:
else:
raise ValueError(f"Unsupported output format: {output_path}")

def get_data(self, elements_filepath: Path) -> list[dict]:
if elements_filepath.suffix == ".json":
with elements_filepath.open() as f:
return json.load(f)
elif elements_filepath.suffix == ".ndjson":
with elements_filepath.open() as f:
return ndjson.load(f)
else:
# Attempt to read it as json
try:
with elements_filepath.open() as f:
logger.warning(
f"File extension mismatch, attempting to read as json: {elements_filepath}"
)
return json.load(f)
except Exception:
raise ValueError(f"Failed to read file: {elements_filepath}")

def conform_dict(self, element_dict: dict, file_data: FileData) -> dict:
return element_dict

Expand Down
3 changes: 2 additions & 1 deletion unstructured_ingest/v2/processes/connectors/duckdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pandas as pd

from unstructured_ingest.utils.data_prep import get_data
from unstructured_ingest.v2.interfaces import FileData, UploadStager
from unstructured_ingest.v2.utils import get_enhanced_element_id

Expand Down Expand Up @@ -79,7 +80,7 @@ def run(
output_filename: str,
**kwargs: Any,
) -> Path:
elements_contents = self.get_data(elements_filepath=elements_filepath)
elements_contents = get_data(path=elements_filepath)
output_path = self.get_output_path(output_filename=output_filename, output_dir=output_dir)

output = [
Expand Down
4 changes: 2 additions & 2 deletions unstructured_ingest/v2/processes/connectors/sql/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pydantic import BaseModel, Field, Secret

from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured_ingest.utils.data_prep import get_data_df, split_dataframe
from unstructured_ingest.utils.data_prep import get_data, get_data_df, split_dataframe
from unstructured_ingest.v2.constants import RECORD_ID_LABEL
from unstructured_ingest.v2.interfaces import (
AccessConfig,
Expand Down Expand Up @@ -290,7 +290,7 @@ def run(
output_filename: str,
**kwargs: Any,
) -> Path:
elements_contents = self.get_data(elements_filepath=elements_filepath)
elements_contents = get_data(path=elements_filepath)

df = pd.DataFrame(
data=[
Expand Down

0 comments on commit 32eb53e

Please sign in to comment.