Skip to content

Commit

Permalink
update fsspec metadata to include record locator info
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed May 16, 2024
1 parent a961b47 commit 262009b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 22 deletions.
30 changes: 12 additions & 18 deletions unstructured/ingest/v2/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
S3DownloaderConfig,
S3Indexer,
S3IndexerConfig,
S3Source,
)
from unstructured.ingest.v2.processes.connectors.local import (
LocalDestination,
LocalUploader,
LocalUploaderConfig,
)
Expand All @@ -27,31 +25,27 @@

if __name__ == "__main__":
connection_config = S3ConnectionConfig(anonymous=True)
source = S3Source(
indexer=S3Indexer(
index_config=S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/"),
connection_config=connection_config,
),
downloader=S3Downloader(
download_config=S3DownloaderConfig(download_dir=download_path),
connection_config=connection_config,
),
indexer = S3Indexer(
index_config=S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/"),
connection_config=connection_config,
)
downloader = S3Downloader(
download_config=S3DownloaderConfig(download_dir=download_path),
connection_config=connection_config,
)
partitioner = Partitioner(config=PartitionerConfig(strategy="fast"))
chunker = Chunker(config=ChunkerConfig(chunking_strategy="by_title"))
embedder = Embedder(config=EmbedderConfig(embedding_provider="langchain-huggingface"))
destination = LocalDestination(
uploader=LocalUploader(
upload_config=LocalUploaderConfig(output_directory=str(output_path.resolve()))
)
uploader = LocalUploader(
upload_config=LocalUploaderConfig(output_dir=str(output_path.resolve()))
)
pipeline = Pipeline(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer=source.indexer,
downloader=source.downloader,
indexer=indexer,
downloader=downloader,
partitioner=partitioner,
chunker=chunker,
embedder=embedder,
uploader=destination.uploader,
uploader=uploader,
)
pipeline.run()
12 changes: 8 additions & 4 deletions unstructured/ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from unstructured.ingest.v2.pipeline.steps.stage import UploadStager, UploadStageStep
from unstructured.ingest.v2.pipeline.steps.uncompress import Uncompressor, UncompressStep
from unstructured.ingest.v2.pipeline.steps.upload import Uploader, UploadStep
from unstructured.ingest.v2.pipeline.utils import sterilize_dict
from unstructured.ingest.v2.processes.connectors.local import LocalUploader


Expand All @@ -25,11 +26,11 @@ class Pipeline:
downloader_step: DownloadStep = field(init=False)
partitioner: InitVar[Partitioner]
partitioner_step: PartitionStep = field(init=False)
chunker: InitVar[Chunker] = None
chunker: InitVar[Optional[Chunker]] = None
chunker_step: ChunkStep = field(init=False, default=None)
embedder: InitVar[Embedder] = None
embedder: InitVar[Optional[Embedder]] = None
embedder_step: EmbedStep = field(init=False, default=None)
stager: InitVar[UploadStager] = None
stager: InitVar[Optional[UploadStager]] = None
stager_step: UploadStageStep = field(init=False, default=None)
uploader: InitVar[Uploader] = field(default=LocalUploader)
uploader_step: UploadStep = field(init=False, default=None)
Expand Down Expand Up @@ -88,7 +89,10 @@ def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optio
return final or None

def _run(self):
logger.info(f"Running local pipline: {self}")
logger.info(
f"Running local pipline: {self} with configs: "
f"{sterilize_dict(self.context.to_dict(redact_sensitive=True))}"
)
manager = mp.Manager()
self.context.status = manager.dict()

Expand Down
4 changes: 4 additions & 0 deletions unstructured/ingest/v2/processes/connectors/fsspec/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def get_metadata(self, path) -> DataSourceMetadata:
date_processed=str(time()),
version=str(version),
url=f"{self.index_config.protocol}://{path}",
record_locator={
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
},
)

def run(self, **kwargs) -> Generator[FileData, None, None]:
Expand Down
4 changes: 4 additions & 0 deletions unstructured/ingest/v2/processes/connectors/fsspec/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def get_metadata(self, path) -> DataSourceMetadata:
date_processed=str(time()),
version=version,
url=f"{self.index_config.protocol}://{path}",
record_locator={
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
},
)


Expand Down

0 comments on commit 262009b

Please sign in to comment.