Skip to content

Commit

Permalink
fix how hashing occurs to allow reproducability
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 4, 2023
1 parent 65a7cb2 commit 73619b4
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 12 deletions.
2 changes: 2 additions & 0 deletions unstructured/ingest/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .interfaces import PipelineContext
from .partition import Partitioner
from .pipeline import Pipeline
from .reformat.chunking import Chunker
from .reformat.embedding import Embedder
from .source import Reader
from .write import Writer
Expand All @@ -14,4 +15,5 @@
"PipelineContext",
"Pipeline",
"Writer",
"Chunker",
]
2 changes: 2 additions & 0 deletions unstructured/ingest/pipeline/copy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import shutil
from pathlib import Path

from unstructured.ingest.connector.registry import create_ingest_doc_from_json
from unstructured.ingest.logger import logger
Expand All @@ -13,5 +14,6 @@ def run(self, json_path: str):
ingest_doc_json = self.pipeline_config.ingest_docs_map[doc_hash]
ingest_doc = create_ingest_doc_from_json(ingest_doc_json)
desired_output = ingest_doc._output_filename
Path(desired_output).parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Copying {json_path} -> {desired_output}")
shutil.copy(json_path, desired_output)
6 changes: 2 additions & 4 deletions unstructured/ingest/pipeline/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,15 @@ class PartitionNode(PipelineNode):

def create_hash(self) -> str:
hash_dict = self.partition_config.to_dict()
hash_dict.update(self.partition_kwargs)
hash_dict["partition_kwargs"] = self.partition_kwargs
return hashlib.sha256(json.dumps(hash_dict, sort_keys=True).encode()).hexdigest()[:32]

@abstractmethod
def run(self, json_path: str) -> str:
pass

def get_path(self) -> t.Optional[Path]:
return (
Path(self.pipeline_config.get_working_dir()) / "partitioned" / self.create_hash()
).resolve()
return (Path(self.pipeline_config.get_working_dir()) / "partitioned").resolve()


@dataclass
Expand Down
7 changes: 6 additions & 1 deletion unstructured/ingest/pipeline/partition.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -16,7 +17,11 @@ class Partitioner(PartitionNode):
def run(self, ingest_doc_json) -> str:
doc = create_ingest_doc_from_json(ingest_doc_json)
doc_filename_hash = get_ingest_doc_hash(ingest_doc_json)
doc_filename = f"{doc_filename_hash}.json"
hashed_filename = hashlib.sha256(
f"{self.create_hash()}{doc_filename_hash}".encode(),
).hexdigest()[:32]
self.pipeline_config.ingest_docs_map[hashed_filename] = ingest_doc_json
doc_filename = f"{hashed_filename}.json"
json_path = (Path(self.get_path()) / doc_filename).resolve()
if not self.partition_config.reprocess and json_path.is_file() and json_path.stat().st_size:
logger.debug(f"File exists: {json_path}, skipping partition")
Expand Down
49 changes: 49 additions & 0 deletions unstructured/ingest/pipeline/reformat/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import hashlib
import json
import os.path
import typing as t
from dataclasses import dataclass
from pathlib import Path

from unstructured.ingest.interfaces import (
ChunkingConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.pipeline.interfaces import ReformatNode
from unstructured.staging.base import convert_to_dict, elements_from_json


@dataclass
class Chunker(ReformatNode):
chunking_config: ChunkingConfig
reprocess: bool = False

def create_hash(self) -> str:
hash_dict = self.chunking_config.to_dict()
return hashlib.sha256(json.dumps(hash_dict, sort_keys=True).encode()).hexdigest()[:32]

def run(self, elements_json: str) -> str:
elements_json_filename = os.path.basename(elements_json)
filename_ext = os.path.basename(elements_json_filename)
filename = os.path.splitext(filename_ext)[0]
hashed_filename = hashlib.sha256(f"{self.create_hash()}{filename}".encode()).hexdigest()[
:32
]
json_filename = f"{hashed_filename}.json"
json_path = (Path(self.get_path()) / json_filename).resolve()
self.pipeline_config.ingest_docs_map[
hashed_filename
] = self.pipeline_config.ingest_docs_map[filename]
if not self.reprocess and json_path.is_file() and json_path.stat().st_size:
logger.debug(f"File exists: {json_path}, skipping embedding")
return str(json_path)
elements = elements_from_json(filename=elements_json)
chunked_elements = self.chunking_config.chunk(elements=elements)
elements_dict = convert_to_dict(chunked_elements)
with open(json_path, "w", encoding="utf8") as output_f:
logger.info(f"writing embeddings content to {json_path}")
json.dump(elements_dict, output_f, ensure_ascii=False, indent=2)
return str(json_path)

def get_path(self) -> t.Optional[Path]:
return (Path(self.pipeline_config.get_working_dir()) / "chunked").resolve()
12 changes: 8 additions & 4 deletions unstructured/ingest/pipeline/reformat/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ def create_hash(self) -> str:

def run(self, elements_json: str) -> str:
elements_json_filename = os.path.basename(elements_json)
json_path = (Path(self.get_path()) / elements_json_filename).resolve()
filename_ext = os.path.basename(elements_json_filename)
filename = os.path.splitext(filename_ext)[0]
hashed_filename = hashlib.sha256(f"{self.create_hash()}{filename}".encode()).hexdigest()[
:32
]
json_filename = f"{hashed_filename}.json"
json_path = (Path(self.get_path()) / json_filename).resolve()
if not self.reprocess and json_path.is_file() and json_path.stat().st_size:
logger.debug(f"File exists: {json_path}, skipping embedding")
return str(json_path)
Expand All @@ -38,6 +44,4 @@ def run(self, elements_json: str) -> str:
return str(json_path)

def get_path(self) -> t.Optional[Path]:
return (
Path(self.pipeline_config.get_working_dir()) / "embedded" / self.create_hash()
).resolve()
return (Path(self.pipeline_config.get_working_dir()) / "embedded").resolve()
17 changes: 14 additions & 3 deletions unstructured/ingest/pipeline/sample_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
SimpleS3Config,
)
from unstructured.ingest.interfaces import (
ChunkingConfig,
EmbeddingConfig,
PartitionConfig,
ReadConfig,
)
from unstructured.ingest.pipeline import (
Chunker,
DocFactory,
Embedder,
Partitioner,
Expand All @@ -21,7 +23,7 @@
if __name__ == "__main__":
pipeline_config = PipelineContext(num_processes=1)
read_config = ReadConfig(preserve_downloads=True, download_dir="pipeline-test-output")
partition_config = PartitionConfig()
partition_config = PartitionConfig(strategy="fast")
page_title = "Open Source Software"
auto_suggest = False

Expand All @@ -42,8 +44,17 @@
embedder = Embedder(
pipeline_config=pipeline_config,
embedder_config=EmbeddingConfig(
api_key="FILLIN",
api_key="FILL IN",
),
reprocess=partition_config.reprocess,
)
chunker = Chunker(
pipeline_config=pipeline_config,
chunking_config=ChunkingConfig(
chunk_elements=True,
new_after_n_chars=1499,
),
reprocess=partition_config.reprocess,
)
writer = Writer(
pipeline_config=pipeline_config,
Expand All @@ -58,7 +69,7 @@
doc_factory_node=doc_factory,
source_node=reader,
partition_node=partitioner,
reformat_nodes=[embedder],
reformat_nodes=[chunker, embedder],
write_node=writer,
)
pipeline.run()

0 comments on commit 73619b4

Please sign in to comment.